You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/11/16 08:57:45 UTC

svn commit: r1410236 - in /manifoldcf/branches/CONNECTORS-120: connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ framework/core/src/main/java/org/apache/manifoldcf/core/common/

Author: kwright
Date: Fri Nov 16 07:57:44 2012
New Revision: 1410236

URL: http://svn.apache.org/viewvc?rev=1410236&view=rev
Log:
Finish initial code for port of RSS connector to httpcomponents

Modified:
    manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java
    manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
    manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
    manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
    manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java

Modified: manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java?rev=1410236&r1=1410235&r2=1410236&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java (original)
+++ manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java Fri Nov 16 07:57:44 2012
@@ -67,7 +67,6 @@ public interface IThrottledConnection
   * @return the status code: success, static error, or dynamic error.
   */
   public int executeFetch(String protocol, int port, String urlPath, String userAgent, String from,
-    String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
     String lastETag, String lastModified)
     throws ManifoldCFException, ServiceInterruption;
 

Modified: manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java?rev=1410236&r1=1410235&r2=1410236&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java Fri Nov 16 07:57:44 2012
@@ -974,7 +974,12 @@ public class RSSConnector extends org.ap
                 maxOpenConnectionsPerServer,
                 minimumMillisecondsPerFetchPerServer,
                 connectionLimit,
-                feedTimeout);
+                feedTimeout,
+                proxyHost,
+                proxyPort,
+                proxyAuthDomain,
+                proxyAuthUsername,
+                proxyAuthPassword);
               try
               {
                 // Begin the fetch
@@ -983,8 +988,7 @@ public class RSSConnector extends org.ap
                 {
                   // Execute the request.
                   // Use the connect timeout from the document specification!
-                  int status = connection.executeFetch(protocol,port,pathPart,userAgent,from,proxyHost,proxyPort,
-                    proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,
+                  int status = connection.executeFetch(protocol,port,pathPart,userAgent,from,
                     lastETagValue,lastModifiedValue);
                   switch (status)
                   {

Modified: manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java?rev=1410236&r1=1410235&r2=1410236&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java (original)
+++ manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java Fri Nov 16 07:57:44 2012
@@ -446,14 +446,15 @@ public class Robots
 
       // Do the fetch
       IThrottledConnection connection = fetcher.createConnection(hostName,minimumMillisecondsPerBytePerServer,
-        maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS);
+        maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS,
+        proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
       try
       {
         connection.beginFetch(ROBOT_CONNECTION_TYPE);
         try
         {
           int responseCode = connection.executeFetch(protocol,port,ROBOT_FILE_NAME,userAgent,from,
-            proxyHost, proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,null,null);
+            null,null);
           switch (responseCode)
           {
           case IThrottledConnection.STATUS_OK:

Modified: manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java?rev=1410236&r1=1410235&r2=1410236&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java (original)
+++ manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java Fri Nov 16 07:57:44 2012
@@ -19,6 +19,7 @@
 package org.apache.manifoldcf.crawler.connectors.rss;
 
 import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.common.XThreadInputStream;
 import org.apache.manifoldcf.agents.interfaces.*;
 import org.apache.manifoldcf.crawler.interfaces.*;
 import org.apache.manifoldcf.crawler.system.Logging;
@@ -26,11 +27,34 @@ import org.apache.manifoldcf.crawler.sys
 import java.util.*;
 import java.io.*;
 
-import org.apache.commons.httpclient.*;
-import org.apache.commons.httpclient.methods.*;
-import org.apache.commons.httpclient.params.*;
-import org.apache.commons.httpclient.protocol.*;
-import org.apache.commons.httpclient.auth.*;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.client.HttpClient;
+import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.http.conn.ssl.BrowserCompatHostnameVerifier;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.NTCredentials;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.util.EntityUtils;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.HttpStatus;
+import org.apache.http.HttpHost;
+import org.apache.http.Header;
+import org.apache.http.conn.params.ConnRoutePNames;
+import org.apache.http.message.BasicHeader;
+
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.client.CircularRedirectException;
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.HttpException;
 
 /** This class uses httpclient to fetch stuff from webservers.  However, it additionally controls the fetch
 * rate in two ways: first, controlling the overall bandwidth used per server, and second, limiting the number
@@ -137,7 +161,8 @@ public class ThrottledFetcher
   * @param connectionTimeoutMilliseconds is the number of milliseconds to wait for the connection before timing out.
   */
   public synchronized IThrottledConnection createConnection(String serverName, double minimumMillisecondsPerBytePerServer,
-    int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer, int connectionLimit, int connectionTimeoutMilliseconds)
+    int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer, int connectionLimit, int connectionTimeoutMilliseconds,
+    String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
     throws ManifoldCFException, ServiceInterruption
   {
     Server server;
@@ -148,7 +173,9 @@ public class ThrottledFetcher
       serverMap.put(serverName,server);
     }
 
-    return new ThrottledConnection(server,minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,connectionTimeoutMilliseconds,connectionLimit);
+    return new ThrottledConnection(server,minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,
+      connectionTimeoutMilliseconds,connectionLimit,
+      proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
   }
 
   /** Poll.  This method is designed to allow idle connections to be closed and freed.
@@ -185,236 +212,6 @@ public class ThrottledFetcher
     }
   }
 
-  // Where to record result info/file data
-  protected final static String resultLogFile = "/common/rss/resultlog";
-  // Where to record the actual data
-  protected final static String dataFileFolder = "/common/rss/data/";
-
-  // This is the one instance of the output class
-  protected static DataRecorder dataRecorder = new DataRecorder();
-
-  /** This class takes care of recording data and results for posterity */
-  protected static class DataRecorder
-  {
-    protected int documentNumber = 0;
-    protected long startTime = System.currentTimeMillis();
-    protected boolean initialized = false;
-
-    public DataRecorder()
-    {
-    }
-
-    protected String readFile(File f)
-      throws IOException
-    {
-      InputStream is = new FileInputStream(f);
-      try
-      {
-        Reader r = new InputStreamReader(is);
-        try
-        {
-          char[] characterBuf = new char[32];
-          int amt = r.read(characterBuf);
-          String rval = new String(characterBuf,0,amt);
-          return rval;
-        }
-        finally
-        {
-          r.close();
-        }
-      }
-      finally
-      {
-        is.close();
-      }
-    }
-
-    protected void writeFile(File f, String data)
-      throws IOException
-    {
-      OutputStream os = new FileOutputStream(f);
-      try
-      {
-        Writer w = new OutputStreamWriter(os);
-        try
-        {
-          w.write(data);
-        }
-        finally
-        {
-          w.flush();
-        }
-      }
-      finally
-      {
-        os.close();
-      }
-    }
-
-    protected synchronized void initializeParameters()
-    {
-      if (initialized)
-        return;
-
-      // Create folder, if it doesn't yet exist
-      try
-      {
-        new File(dataFileFolder).mkdirs();
-      }
-      catch (Exception e)
-      {
-        e.printStackTrace();
-      }
-
-      // Either read starting timestamp, or write it
-      File timestampFile = new File("/common/rss/timestamp.log");
-      if (timestampFile.exists())
-      {
-        try
-        {
-          String data = readFile(timestampFile);
-          startTime = new Long(data).longValue();
-        }
-        catch (IOException e)
-        {
-          e.printStackTrace();
-        }
-      }
-      else
-      {
-        try
-        {
-          writeFile(timestampFile,new Long(startTime).toString());
-        }
-        catch (IOException e)
-        {
-          e.printStackTrace();
-        }
-      }
-
-      // Read starting document number, if it exists
-      File documentNumberFile = new File("/common/rss/docnumber.log");
-      if (documentNumberFile.exists())
-      {
-        try
-        {
-          String data = readFile(documentNumberFile);
-          documentNumber = Integer.parseInt(data);
-        }
-        catch (Exception e)
-        {
-          e.printStackTrace();
-        }
-      }
-
-      initialized = true;
-    }
-
-    public DataSession getSession(String url)
-      throws ManifoldCFException
-    {
-      initializeParameters();
-      return new DataSession(this,url);
-    }
-
-    /** Atomically write resultlog record, returning data file name to use */
-    public synchronized String writeResponseRecord(String url, int responseCode, ArrayList headerNames, ArrayList headerValues)
-      throws ManifoldCFException
-    {
-      // Open log file
-      try
-      {
-        OutputStream os = new FileOutputStream(resultLogFile,true);
-        try
-        {
-          OutputStreamWriter writer = new OutputStreamWriter(os,"utf-8");
-          try
-          {
-            String documentName = Integer.toString(documentNumber++);
-            writer.write("Time: "+new Long(System.currentTimeMillis()-startTime).toString()+"\n");
-            writer.write("URI: "+url+"\n");
-            writer.write("File: "+documentName+"\n");
-            writer.write("Code: "+Integer.toString(responseCode)+"\n");
-            int i = 0;
-            while (i < headerNames.size())
-            {
-              writer.write("Header: "+(String)headerNames.get(i)+":"+(String)headerValues.get(i)+"\n");
-              i++;
-            }
-            writeFile(new File("/common/rss/docnumber.log"),new Integer(documentNumber).toString());
-            return documentName;
-          }
-          finally
-          {
-            writer.close();
-          }
-        }
-        finally
-        {
-          os.close();
-        }
-      }
-      catch (IOException e)
-      {
-        throw new ManifoldCFException("Error recording file info: "+e.getMessage(),e);
-      }
-
-    }
-
-
-  }
-
-  /** Helper class for the above */
-  protected static class DataSession
-  {
-    protected DataRecorder dr;
-    protected String url;
-    protected int responseCode = 0;
-    protected ArrayList headerNames = new ArrayList();
-    protected ArrayList headerValues = new ArrayList();
-    protected String documentName = null;
-
-    public DataSession(DataRecorder dr, String url)
-    {
-      this.dr = dr;
-      this.url = url;
-    }
-
-    public void setResponseCode(int responseCode)
-    {
-      this.responseCode = responseCode;
-    }
-
-    public void addHeader(String headerName, String headerValue)
-    {
-      headerNames.add(headerName);
-      headerValues.add(headerValue);
-    }
-
-    public void endHeader()
-      throws ManifoldCFException
-    {
-      documentName = dr.writeResponseRecord(url,responseCode,headerNames,headerValues);
-    }
-
-    public void write(byte[] theBytes, int off, int length)
-      throws IOException
-    {
-      if (documentName == null)
-        throw new IOException("Must end header before reading data!");
-      OutputStream os = new FileOutputStream(dataFileFolder+documentName,true);
-      try
-      {
-        os.write(theBytes,off,length);
-      }
-      finally
-      {
-        os.close();
-      }
-    }
-
-  }
-
   /** This class represents an established connection to a URL.
   */
   protected static class ThrottledConnection implements IThrottledConnection
@@ -428,7 +225,7 @@ public class ThrottledFetcher
     /** The server object we use to track connections and fetches. */
     protected Server server;
     /** The method object */
-    protected HttpMethodBase fetchMethod = null;
+    protected HttpRequestBase executeMethod = null;
     /** The start-fetch time */
     protected long startFetchTime = -1L;
     /** The error trace, if any */
@@ -441,18 +238,23 @@ public class ThrottledFetcher
     protected String fetchType = null;
     /** The current bytes in the current fetch */
     protected long fetchCounter = 0L;
-    /** The connection pool (max size 1) */
-    protected MultiThreadedHttpConnectionManager connectionManager = null;
     /** Connection timeout in milliseconds */
     protected int connectionTimeoutMilliseconds;
 
-    /** Hack added to record all access data from current crawler */
-    protected DataSession dataSession = null;
-
+    /** The thread that is actually doing the work */
+    protected ExecuteMethodThread methodThread = null;
+    /** Set if thread has been started */
+    protected boolean threadStarted = false;
+    /** The client connection manager */
+    protected ClientConnectionManager connectionManager = null;
+    /** The httpclient */
+    protected HttpClient httpClient = null;
+    
     /** Constructor.
     */
     public ThrottledConnection(Server server, double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
-      long minimumMillisecondsPerFetchPerServer, int connectionTimeoutMilliseconds, int connectionLimit)
+      long minimumMillisecondsPerFetchPerServer, int connectionTimeoutMilliseconds, int connectionLimit,
+      String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
       throws ManifoldCFException
     {
       this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
@@ -460,11 +262,42 @@ public class ThrottledFetcher
       this.minimumMillisecondsPerFetchPerServer = minimumMillisecondsPerFetchPerServer;
       this.server = server;
       this.connectionTimeoutMilliseconds = connectionTimeoutMilliseconds;
-      connectionManager = new MultiThreadedHttpConnectionManager();
-      HttpConnectionManagerParams httpConParam = connectionManager.getParams();
-      httpConParam.setMaxTotalConnections(1);
-      httpConParam.setConnectionTimeout(connectionTimeoutMilliseconds);
-      httpConParam.setSoTimeout(connectionTimeoutMilliseconds);
+      PoolingClientConnectionManager localConnectionManager = new PoolingClientConnectionManager();
+      localConnectionManager.setMaxTotal(1);
+      connectionManager = localConnectionManager;
+      
+      BasicHttpParams params = new BasicHttpParams();
+      params.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY,true);
+      params.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK,false);
+      params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT,connectionTimeoutMilliseconds);
+      params.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT,connectionTimeoutMilliseconds);
+      DefaultHttpClient localHttpClient = new DefaultHttpClient(connectionManager,params);
+      localHttpClient.setRedirectStrategy(new DefaultRedirectStrategy());
+      
+      // If there's a proxy, set that too.
+      if (proxyHost != null && proxyHost.length() > 0)
+      {
+
+        // Configure proxy authentication
+        if (proxyAuthUsername != null && proxyAuthUsername.length() > 0)
+        {
+          if (proxyAuthPassword == null)
+            proxyAuthPassword = "";
+          if (proxyAuthDomain == null)
+            proxyAuthDomain = "";
+
+          localHttpClient.getCredentialsProvider().setCredentials(
+            new AuthScope(proxyHost, proxyPort),
+            new NTCredentials(proxyAuthUsername, proxyAuthPassword, currentHost, proxyAuthDomain));
+        }
+
+        HttpHost proxy = new HttpHost(proxyHost, proxyPort);
+
+        httpClient.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
+      }
+      
+      httpClient = localHttpClient;
+
       registerGlobalHandle(connectionLimit);
       server.registerConnection(maxOpenConnectionsPerServer);
     }
@@ -486,6 +319,7 @@ public class ThrottledFetcher
       {
         throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
       }
+      threadStarted = false;
     }
 
     /** Log the fetch of a number of bytes. */
@@ -494,46 +328,6 @@ public class ThrottledFetcher
       fetchCounter += (long)count;
     }
 
-    protected static class ExecuteMethodThread extends Thread
-    {
-      protected HttpClient client;
-      protected HostConfiguration hostConfiguration;
-      protected HttpMethodBase executeMethod;
-      protected Throwable exception = null;
-      protected int rval = 0;
-
-      public ExecuteMethodThread(HttpClient client, HostConfiguration hostConfiguration, HttpMethodBase executeMethod)
-      {
-        super();
-        setDaemon(true);
-        this.client = client;
-        this.hostConfiguration = hostConfiguration;
-        this.executeMethod = executeMethod;
-      }
-
-      public void run()
-      {
-        try
-        {
-          // Call the execute method appropriately
-          rval = client.executeMethod(hostConfiguration,executeMethod);
-        }
-        catch (Throwable e)
-        {
-          this.exception = e;
-        }
-      }
-
-      public Throwable getException()
-      {
-        return exception;
-      }
-
-      public int getResponse()
-      {
-        return rval;
-      }
-    }
 
     /** Execute the fetch and get the return code.  This method uses the
     * standard logging mechanism to keep track of the fetch attempt.  It also
@@ -556,7 +350,6 @@ public class ThrottledFetcher
     * @return the status code: success, static error, or dynamic error.
     */
     public int executeFetch(String protocol, int port, String urlPath, String userAgent, String from,
-      String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
       String lastETag, String lastModified)
       throws ManifoldCFException, ServiceInterruption
     {
@@ -568,75 +361,31 @@ public class ThrottledFetcher
       sb.append(urlPath);
       myUrl = sb.toString();
 
-      if (recordEverything)
-        // Start a new data session
-        dataSession = dataRecorder.getSession(myUrl);
-
+      // Create the get method
+      executeMethod = new HttpGet(myUrl);
+      
+      startFetchTime = System.currentTimeMillis();
+
+      // Set all appropriate headers
+      executeMethod.setHeader(new BasicHeader("User-Agent",userAgent));
+      executeMethod.setHeader(new BasicHeader("From",from));
+      if (lastETag != null)
+        executeMethod.setHeader(new BasicHeader("ETag",lastETag));
+      if (lastModified != null)
+        executeMethod.setHeader(new BasicHeader("Last-Modified",lastModified));
+      // Create the execution thread.
+      methodThread = new ExecuteMethodThread(this, server,
+        minimumMillisecondsPerBytePerServer, httpClient, executeMethod);
+      // Start the method thread, which will start the transaction
       try
       {
-        HttpClient client = new HttpClient(connectionManager);
-        // Permit circular redirections, because that is how some sites set cookies
-        client.getParams().setParameter(org.apache.commons.httpclient.params.HttpClientParams.ALLOW_CIRCULAR_REDIRECTS,new Boolean(true));
-        fetchMethod = new GetMethod();
-        HostConfiguration config = new HostConfiguration();
-
-        config.setHost(server.getServerName(),port,Protocol.getProtocol(protocol));
-
-        // If there's a proxy, set that too.
-        if (proxyHost != null && proxyHost.length() > 0)
-        {
-          config.setProxy(proxyHost,proxyPort);
-          if (proxyAuthUsername != null && proxyAuthUsername.length() > 0)
-          {
-            if (proxyAuthPassword == null)
-              proxyAuthPassword = "";
-            if (proxyAuthDomain == null)
-              proxyAuthDomain = "";
-            // Set up NTLM credentials for this fetch too.
-            client.getState().setProxyCredentials(AuthScope.ANY,
-              new NTCredentials(proxyAuthUsername,proxyAuthPassword,currentHost,proxyAuthDomain));
-          }
-        }
-
-        startFetchTime = System.currentTimeMillis();
-        fetchMethod.setURI(new URI(urlPath,true));
-
-        // Set all appropriate headers
-        fetchMethod.setRequestHeader("User-Agent",userAgent);
-        fetchMethod.setRequestHeader("From",from);
-        if (lastETag != null)
-          fetchMethod.setRequestHeader("ETag",lastETag);
-        if (lastModified != null)
-          fetchMethod.setRequestHeader("Last-Modified",lastModified);
-
-        fetchMethod.getParams().setSoTimeout(connectionTimeoutMilliseconds);
-        // fetchMethod.getParams().setIntParameter("http.socket.timeout", connectionTimeoutMilliseconds);
-
-        fetchMethod.setFollowRedirects(true);
-
-        // Fire it off!
+        methodThread.start();
+        threadStarted = true;
+        // We want to wait until at least the execution has fired, and then figure out where we
+        // stand
         try
         {
-          ExecuteMethodThread t = new ExecuteMethodThread(client,config,fetchMethod);
-          try
-          {
-            t.start();
-            t.join();
-            Throwable thr = t.getException();
-            if (thr != null)
-            {
-              throw thr;
-            }
-            statusCode = t.getResponse();
-            if (recordEverything)
-              dataSession.setResponseCode(statusCode);
-          }
-          catch (InterruptedException e)
-          {
-            t.interrupt();
-            // We need the caller to abandon any connections left around, so rethrow in a way that forces them to process the event properly.
-            throw e;
-          }
+          int statusCode = methodThread.getResponseCode();
           long currentTime;
           switch (statusCode)
           {
@@ -668,116 +417,96 @@ public class ThrottledFetcher
           default:
             return STATUS_PAGEERROR;
           }
-
-        }
-        catch (java.net.SocketTimeoutException e)
-        {
-          throwable = e;
-          long currentTime = System.currentTimeMillis();
-          throw new ServiceInterruption("Timed out waiting for IO for '"+myUrl+"': "+e.getMessage(), e, currentTime + 300000L,
-            currentTime + 120L * 60000L,-1,false);
-        }
-        catch (org.apache.commons.httpclient.ConnectTimeoutException e)
-        {
-          throwable = e;
-          long currentTime = System.currentTimeMillis();
-          throw new ServiceInterruption("Timed out waiting for connect for '"+myUrl+"': "+e.getMessage(), e, currentTime + 60L * 60000L,
-            currentTime + 720L * 60000L,-1,false);
-        }
-        catch (InterruptedIOException e)
-        {
-          throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
         }
-        catch (org.apache.commons.httpclient.CircularRedirectException e)
+        catch (InterruptedException e)
         {
-          throwable = e;
-          statusCode = FETCH_CIRCULAR_REDIRECT;
-          if (recordEverything)
-            dataSession.setResponseCode(statusCode);
-          return STATUS_PAGEERROR;
-        }
-        catch (org.apache.commons.httpclient.NoHttpResponseException e)
-        {
-          throwable = e;
-          // Give up after 2 hours.
-          long currentTime = System.currentTimeMillis();
-          throw new ServiceInterruption("Timed out waiting for response for '"+myUrl+"'", e, currentTime + 15L * 60000L,
-            currentTime + 120L * 60000L,-1,false);
-        }
-        catch (java.net.ConnectException e)
-        {
-          throwable = e;
-          // Give up after 6 hours.
-          long currentTime = System.currentTimeMillis();
-          throw new ServiceInterruption("Timed out waiting for a connection for '"+myUrl+"'", e, currentTime + 1000000L,
-            currentTime + 720L * 60000L,-1,false);
-        }
-        catch (java.net.NoRouteToHostException e)
-        {
-          // This exception means we know the IP address but can't get there.  That's either a firewall issue, or it's something transient
-          // with the network.  Some degree of retry is probably wise.
-          throwable = e;
-          long currentTime = System.currentTimeMillis();
-          throw new ServiceInterruption("No route to host for '"+myUrl+"'", e, currentTime + 1000000L,
-            currentTime + 720L * 60000L,-1,false);
-        }
-        catch (IOException e)
-        {
-          // Treat this as a bad url.  We don't know what happened, but it isn't something we are going to naively
-          // retry on.
-          throwable = e;
-          statusCode = FETCH_IO_ERROR;
-          if (recordEverything)
-            dataSession.setResponseCode(statusCode);
-          return STATUS_PAGEERROR;
+          methodThread.interrupt();
+          methodThread = null;
+          threadStarted = false;
+          throw e;
         }
 
       }
       catch (InterruptedException e)
       {
         // Drop the current connection on the floor, so it cannot be reused.
-        fetchMethod = null;
+        executeMethod = null;
         throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
       }
-      catch (URIException e)
+      catch (java.net.MalformedURLException e)
       {
         throwable = new ManifoldCFException("Illegal URI: '"+myUrl+"'",e);
         statusCode = FETCH_BAD_URI;
-        if (recordEverything)
-          dataSession.setResponseCode(statusCode);
         return STATUS_PAGEERROR;
       }
-      catch (IllegalArgumentException e)
+      catch (java.net.SocketTimeoutException e)
       {
-        throwable = new ManifoldCFException("Illegal URI: '"+myUrl+"'",e);
-        statusCode = FETCH_BAD_URI;
-        if (recordEverything)
-          dataSession.setResponseCode(statusCode);
-        return STATUS_PAGEERROR;
+        throwable = e;
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for IO for '"+myUrl+"': "+e.getMessage(), e, currentTime + 300000L,
+          currentTime + 120L * 60000L,-1,false);
+      }
+      catch (ConnectTimeoutException e)
+      {
+        throwable = e;
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for connect for '"+myUrl+"': "+e.getMessage(), e, currentTime + 60L * 60000L,
+          currentTime + 720L * 60000L,-1,false);
       }
-      catch (IllegalStateException e)
+      catch (InterruptedIOException e)
       {
-        throwable = new ManifoldCFException("Illegal state while fetching URI: '"+myUrl+"'",e);
-        statusCode = FETCH_SEQUENCE_ERROR;
-        if (recordEverything)
-          dataSession.setResponseCode(statusCode);
+        throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+      }
+      catch (CircularRedirectException e)
+      {
+        throwable = e;
+        statusCode = FETCH_CIRCULAR_REDIRECT;
         return STATUS_PAGEERROR;
       }
-      catch (ServiceInterruption e)
+      catch (NoHttpResponseException e)
+      {
+        throwable = e;
+        // Give up after 2 hours.
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for response for '"+myUrl+"'", e, currentTime + 15L * 60000L,
+          currentTime + 120L * 60000L,-1,false);
+      }
+      catch (java.net.ConnectException e)
       {
-        throw e;
+        throwable = e;
+        // Give up after 6 hours.
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for a connection for '"+myUrl+"'", e, currentTime + 1000000L,
+          currentTime + 720L * 60000L,-1,false);
       }
-      catch (ManifoldCFException e)
+      catch (java.net.NoRouteToHostException e)
       {
-        throw e;
+        // This exception means we know the IP address but can't get there.  That's either a firewall issue, or it's something transient
+        // with the network.  Some degree of retry is probably wise.
+        throwable = e;
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("No route to host for '"+myUrl+"'", e, currentTime + 1000000L,
+          currentTime + 720L * 60000L,-1,false);
+      }
+      catch (HttpException e)
+      {
+        throwable = e;
+        statusCode = FETCH_IO_ERROR;
+        return STATUS_PAGEERROR;
+      }
+      catch (IOException e)
+      {
+        // Treat this as a bad url.  We don't know what happened, but it isn't something we are going to naively
+        // retry on.
+        throwable = e;
+        statusCode = FETCH_IO_ERROR;
+        return STATUS_PAGEERROR;
       }
       catch (Throwable e)
       {
         Logging.connectors.debug("RSS: Caught an unexpected exception: "+e.getMessage(),e);
         throwable = e;
         statusCode = FETCH_UNKNOWN_ERROR;
-        if (recordEverything)
-          dataSession.setResponseCode(statusCode);
         return STATUS_PAGEERROR;
       }
     }
@@ -797,24 +526,65 @@ public class ThrottledFetcher
     public InputStream getResponseBodyStream()
       throws ManifoldCFException, ServiceInterruption
     {
-      if (fetchMethod == null)
-        throw new ManifoldCFException("Attempt to get a response when there is no method");
+      if (executeMethod == null)
+        throw new ManifoldCFException("Attempt to get an input stream when there is no method");
+      if (methodThread == null || threadStarted == false)
+        throw new ManifoldCFException("Attempt to get an input stream when no method thread");
       try
       {
-        if (recordEverything)
-          dataSession.endHeader();
-        InputStream bodyStream = fetchMethod.getResponseBodyAsStream();
-        if (bodyStream == null)
-          throw new ManifoldCFException("Failed to set up body response stream");
-        return new ThrottledInputstream(this,server,bodyStream,minimumMillisecondsPerBytePerServer,dataSession);
+        return methodThread.getSafeInputStream();
       }
-      catch (IOException e)
+      catch (InterruptedException e)
+      {
+        methodThread.interrupt();
+        throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+      }
+      catch (java.net.SocketTimeoutException e)
+      {
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for IO for '"+myUrl+"': "+e.getMessage(), e, currentTime + 300000L,
+          currentTime + 120L * 60000L,-1,false);
+      }
+      catch (ConnectTimeoutException e)
+      {
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for connect for '"+myUrl+"': "+e.getMessage(), e, currentTime + 60L * 60000L,
+          currentTime + 720L * 60000L,-1,false);
+      }
+      catch (InterruptedIOException e)
+      {
+        methodThread.interrupt();
+        throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+      }
+      catch (NoHttpResponseException e)
+      {
+        // Give up after 2 hours.
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for response for '"+myUrl+"'", e, currentTime + 15L * 60000L,
+          currentTime + 120L * 60000L,-1,false);
+      }
+      catch (java.net.ConnectException e)
+      {
+        // Give up after 6 hours.
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for a stream connection for '"+myUrl+"'", e, currentTime + 1000000L,
+          currentTime + 720L * 60000L,-1,false);
+      }
+      catch (java.net.NoRouteToHostException e)
+      {
+        // This exception means we know the IP address but can't get there.  That's either a firewall issue, or it's something transient
+        // with the network.  Some degree of retry is probably wise.
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("No route to host for '"+myUrl+"'", e, currentTime + 1000000L,
+          currentTime + 720L * 60000L,-1,false);
+      }
+      catch (HttpException e)
       {
-        throw new ManifoldCFException("IO exception setting up response stream",e);
+        throw new ManifoldCFException("Http exception reading stream: "+e.getMessage(),e);
       }
-      catch (IllegalStateException e)
+      catch (IOException e)
       {
-        throw new ManifoldCFException("State error getting response body",e);
+        throw new ManifoldCFException("I/O exception reading stream: "+e.getMessage(),e);
       }
     }
 
@@ -825,12 +595,66 @@ public class ThrottledFetcher
     public String getResponseHeader(String headerName)
       throws ManifoldCFException, ServiceInterruption
     {
-      Header h = fetchMethod.getResponseHeader(headerName);
-      if (h == null)
-        return null;
-      if (recordEverything)
-        dataSession.addHeader(headerName,h.getValue());
-      return h.getValue();
+      if (executeMethod == null)
+        throw new ManifoldCFException("Attempt to get a header when there is no method");
+      if (methodThread == null || threadStarted == false)
+        throw new ManifoldCFException("Attempt to get a header when no method thread");
+      try
+      {
+        return methodThread.getFirstHeader(headerName);
+      }
+      catch (InterruptedException e)
+      {
+        methodThread.interrupt();
+        throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+      }
+      catch (java.net.SocketTimeoutException e)
+      {
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for IO for '"+myUrl+"': "+e.getMessage(), e, currentTime + 300000L,
+          currentTime + 120L * 60000L,-1,false);
+      }
+      catch (ConnectTimeoutException e)
+      {
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for connect for '"+myUrl+"': "+e.getMessage(), e, currentTime + 60L * 60000L,
+          currentTime + 720L * 60000L,-1,false);
+      }
+      catch (InterruptedIOException e)
+      {
+        methodThread.interrupt();
+        throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+      }
+      catch (NoHttpResponseException e)
+      {
+        // Give up after 2 hours.
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for response for '"+myUrl+"'", e, currentTime + 15L * 60000L,
+          currentTime + 120L * 60000L,-1,false);
+      }
+      catch (java.net.ConnectException e)
+      {
+        // Give up after 6 hours.
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("Timed out waiting for a connection for '"+myUrl+"'", e, currentTime + 1000000L,
+          currentTime + 720L * 60000L,-1,false);
+      }
+      catch (java.net.NoRouteToHostException e)
+      {
+        // This exception means we know the IP address but can't get there.  That's either a firewall issue, or it's something transient
+        // with the network.  Some degree of retry is probably wise.
+        long currentTime = System.currentTimeMillis();
+        throw new ServiceInterruption("No route to host for '"+myUrl+"'", e, currentTime + 1000000L,
+          currentTime + 720L * 60000L,-1,false);
+      }
+      catch (HttpException e)
+      {
+        throw new ManifoldCFException("Http exception reading response: "+e.getMessage(),e);
+      }
+      catch (IOException e)
+      {
+        throw new ManifoldCFException("I/O exception reading response: "+e.getMessage(),e);
+      }
     }
 
     /** Done with the fetch.  Call this when the fetch has been completed.  A log entry will be generated
@@ -839,8 +663,31 @@ public class ThrottledFetcher
     public void doneFetch(IVersionActivity activities)
       throws ManifoldCFException
     {
+      
       if (fetchType != null)
       {
+        // Abort the connection, if not already complete
+        try
+        {
+          methodThread.abort();
+        }
+        catch (InterruptedException e)
+        {
+          throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+        }
+        catch (InterruptedIOException e)
+        {
+          throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+        }
+        catch (HttpException e)
+        {
+          throw new ManifoldCFException("Unexpected Http exception: "+e.getMessage(),e);
+        }
+        catch (IOException e)
+        {
+          throw new ManifoldCFException("Unexpected IO exception: "+e.getMessage(),e);
+        }
+
         long endTime = System.currentTimeMillis();
         server.endFetch();
 
@@ -854,19 +701,31 @@ public class ThrottledFetcher
           if (Logging.connectors.isDebugEnabled())
             Logging.connectors.debug("RSS: Fetch exception for '"+myUrl+"'",throwable);
         }
-        // Clear out all the parameters
-        if (fetchMethod != null)
+        
+        // Shut down (join) the connection thread, if any, and if it started
+        if (threadStarted)
         {
           try
           {
-            fetchMethod.releaseConnection();
+            methodThread.finishUp();
+          }
+          catch (InterruptedException e)
+          {
+            throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+          }
+          catch (HttpException e)
+          {
+            throw new ManifoldCFException("Unexpected HTTP exception: "+e.getMessage(),e);
           }
-          catch (IllegalStateException e)
+          catch (IOException e)
           {
-            // looks like the fetch method didn't have one, or it was already released.  Just eat the exception.
+            throw new ManifoldCFException("Unexpected IO exception: "+e.getMessage(),e);
           }
-          fetchMethod = null;
+          threadStarted = false;
+          methodThread = null;
         }
+        
+        executeMethod = null;
         throwable = null;
         startFetchTime = -1L;
         myUrl = null;
@@ -902,17 +761,14 @@ public class ThrottledFetcher
     /** The stream we are wrapping. */
     protected InputStream inputStream;
 
-    protected DataSession dataSession;
-
     /** Constructor.
     */
-    public ThrottledInputstream(ThrottledConnection connection, Server server, InputStream is, double minimumMillisecondsPerBytePerServer, DataSession dataSession)
+    public ThrottledInputstream(ThrottledConnection connection, Server server, InputStream is, double minimumMillisecondsPerBytePerServer)
     {
       this.throttledConnection = connection;
       this.server = server;
       this.inputStream = is;
       this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
-      this.dataSession = dataSession;
     }
 
     /** Read a byte.
@@ -980,8 +836,6 @@ public class ThrottledFetcher
         try
         {
           amt = inputStream.read(b,off,len);
-          if (recordEverything && amt != -1)
-            dataSession.write(b,off,amt);
           return amt;
         }
         finally
@@ -1337,4 +1191,265 @@ public class ThrottledFetcher
 
   }
 
+  /** This thread does the actual socket communication with the server.
+  * It's set up so that it can be abandoned at shutdown time.
+  *
+  * The way it works is as follows:
+  * - it starts the transaction
+  * - it receives the response, and saves that for the calling class to inspect
+  * - it transfers the data part to an input stream provided to the calling class
+  * - it shuts the connection down
+  *
+  * If there is an error, the sequence is aborted, and an exception is recorded
+  * for the calling class to examine.
+  *
+  * The calling class basically accepts the sequence above.  It starts the
+  * thread, and tries to get a response code.  If instead an exception is seen,
+  * the exception is thrown up the stack.
+  */
+  protected static class ExecuteMethodThread extends Thread
+  {
+    /** The connection */
+    protected final ThrottledConnection theConnection;
+    /** The connection bandwidth we want */
+    protected final double minimumMillisecondsPerBytePerServer;
+    /** The server object we use to track connections and fetches. */
+    protected final Server server;
+    /** Client and method, all preconfigured */
+    protected final HttpClient httpClient;
+    protected final HttpRequestBase executeMethod;
+    
+    protected HttpResponse response = null;
+    protected Throwable responseException = null;
+    protected XThreadInputStream threadStream = null;
+    protected boolean threadCreated = false;
+    protected Throwable streamException = null;
+
+    protected Throwable shutdownException = null;
+
+    protected Throwable generalException = null;
+    
+    public ExecuteMethodThread(ThrottledConnection theConnection, Server server,
+      double minimumMillisecondsPerBytePerServer,
+      HttpClient httpClient, HttpRequestBase executeMethod)
+    {
+      super();
+      setDaemon(true);
+      this.theConnection = theConnection;
+      this.server = server;
+      this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
+      this.httpClient = httpClient;
+      this.executeMethod = executeMethod;
+    }
+
+    public void run()
+    {
+      try
+      {
+        try
+        {
+          // Call the execute method appropriately
+          synchronized (this)
+          {
+            try
+            {
+              response = httpClient.execute(executeMethod);
+            }
+            catch (java.net.SocketTimeoutException e)
+            {
+              responseException = e;
+            }
+            catch (ConnectTimeoutException e)
+            {
+              responseException = e;
+            }
+            catch (InterruptedIOException e)
+            {
+              throw e;
+            }
+            catch (Throwable e)
+            {
+              responseException = e;
+            }
+            this.notifyAll();
+          }
+            
+          // Start the transfer of the content
+          if (responseException == null)
+          {
+            synchronized (this)
+            {
+              try
+              {
+                InputStream bodyStream = response.getEntity().getContent();
+                if (bodyStream != null)
+                {
+                  bodyStream = new ThrottledInputstream(theConnection,server,bodyStream,minimumMillisecondsPerBytePerServer);
+                  threadStream = new XThreadInputStream(bodyStream);
+                }
+                threadCreated = true;
+              }
+              catch (java.net.SocketTimeoutException e)
+              {
+                streamException = e;
+              }
+              catch (ConnectTimeoutException e)
+              {
+                streamException = e;
+              }
+              catch (InterruptedIOException e)
+              {
+                throw e;
+              }
+              catch (Throwable e)
+              {
+                streamException = e;
+              }
+              this.notifyAll();
+            }
+          }
+          
+          if (responseException == null && streamException == null)
+          {
+            if (threadStream != null)
+            {
+              // Stuff the content until we are done
+              threadStream.stuffQueue();
+            }
+          }
+          
+        }
+        finally
+        {
+          synchronized (this)
+          {
+            try
+            {
+              executeMethod.abort();
+            }
+            catch (Throwable e)
+            {
+              shutdownException = e;
+            }
+            this.notifyAll();
+          }
+        }
+      }
+      catch (Throwable e)
+      {
+        // We catch exceptions here that should ONLY be InterruptedExceptions, as a result of the thread being aborted.
+        this.generalException = e;
+      }
+    }
+
+    public int getResponseCode()
+      throws InterruptedException, IOException, HttpException
+    {
+      // Must wait until the response object is there
+      while (true)
+      {
+        synchronized (this)
+        {
+          checkException(responseException);
+          if (response != null)
+            return response.getStatusLine().getStatusCode();
+          wait();
+        }
+      }
+    }
+      
+    public String getFirstHeader(String headerName)
+      throws InterruptedException, IOException, HttpException
+    {
+      // Must wait for the response object to appear
+      while (true)
+      {
+        synchronized (this)
+        {
+          checkException(responseException);
+          if (response != null)
+          {
+            Header h = response.getFirstHeader(headerName);
+            if (h == null)
+              return null;
+            return h.getValue();
+          }
+          wait();
+        }
+      }
+    }
+      
+    public InputStream getSafeInputStream()
+      throws InterruptedException, IOException, HttpException
+    {
+      // Must wait until stream is created, or until we note an exception was thrown.
+      while (true)
+      {
+        synchronized (this)
+        {
+          if (responseException != null)
+            throw new IllegalStateException("Check for response before getting stream");
+          checkException(streamException);
+          if (threadCreated)
+            return threadStream;
+          wait();
+        }
+      }
+    }
+    
+    public void abort()
+      throws InterruptedException, IOException, HttpException
+    {
+      // This will be called during the stream access, either
+      // in addition to getSafeInputStream or in exchange.
+      // So we wait for the stream, and when we have it we
+      // kill it, and that will cause the whole thread to abort, 
+      // if it isn't already done.
+      while (true)
+      {
+        synchronized (this)
+        {
+          if (responseException != null)
+            throw new IllegalStateException("Check for response before aborting stream");
+          checkException(streamException);
+          if (threadCreated)
+          {
+            if (threadStream != null)
+              threadStream.abort();
+            return;
+          }
+          wait();
+        }
+      }
+    }
+    
+    public void finishUp()
+      throws InterruptedException, IOException, HttpException
+    {
+      join();
+      checkException(shutdownException);
+    }
+    
+    protected synchronized void checkException(Throwable exception)
+      throws IOException, HttpException
+    {
+      if (exception != null)
+      {
+        // Throw the current exception, but clear it, so no further throwing is possible on the same problem.
+        Throwable e = exception;
+        if (e instanceof IOException)
+          throw (IOException)e;
+        else if (e instanceof HttpException)
+          throw (HttpException)e;
+        else if (e instanceof RuntimeException)
+          throw (RuntimeException)e;
+        else if (e instanceof Error)
+          throw (Error)e;
+        else
+          throw new RuntimeException("Unhandled exception of type: "+e.getClass().getName(),e);
+      }
+    }
+
+  }
+
 }

Modified: manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java?rev=1410236&r1=1410235&r2=1410236&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java (original)
+++ manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java Fri Nov 16 07:57:44 2012
@@ -30,7 +30,9 @@ public class XThreadInputStream extends 
   private int startPoint = 0;
   private int byteCount = 0;
   private boolean streamEnd = false;
+  private IOException failureException = null;
   private InputStream sourceStream;
+  private boolean abort = false;
   
   /** Constructor */
   public XThreadInputStream(InputStream sourceStream)
@@ -38,6 +40,17 @@ public class XThreadInputStream extends 
     this.sourceStream = sourceStream;
   }
   
+  /** Call this method to abort the stuffQueue() method.
+  */
+  public void abort()
+  {
+    synchronized (this)
+    {
+      abort = true;
+      notifyAll();
+    }
+  }
+  
   /** This method is called from the helper thread side, to keep the queue
   * stuffed.  It exits when the stream is empty, or when interrupted.
   */
@@ -50,7 +63,7 @@ public class XThreadInputStream extends 
       int readStartPoint;
       synchronized (this)
       {
-        if (streamEnd)
+        if (streamEnd || abort)
           return;
         // Calculate amount to read
         maxToRead = buffer.length - byteCount;
@@ -65,10 +78,23 @@ public class XThreadInputStream extends 
       // See how to break up the reads into pieces.  We only do one piece right now.
       if (readStartPoint + maxToRead >= buffer.length)
         maxToRead = buffer.length - readStartPoint;
-      int amt = sourceStream.read(buffer, readStartPoint, maxToRead);
+      
+      int amt = -1;
+      IOException exception = null;
+      try
+      {
+        amt = sourceStream.read(buffer, readStartPoint, maxToRead);
+      }
+      catch (IOException e)
+      {
+        exception = e;
+      }
+      
       synchronized (this)
       {
-        if (amt == -1)
+        if (exception != null)
+          failureException = exception;
+        else if (amt == -1)
           streamEnd = true;
         else
           byteCount += amt;
@@ -129,6 +155,8 @@ public class XThreadInputStream extends 
                 return totalAmt;
               return -1;
             }
+            if (failureException != null)
+              throw failureException;
             wait();
             continue;
           }