You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/11/16 08:57:45 UTC
svn commit: r1410236 - in /manifoldcf/branches/CONNECTORS-120:
connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/
framework/core/src/main/java/org/apache/manifoldcf/core/common/
Author: kwright
Date: Fri Nov 16 07:57:44 2012
New Revision: 1410236
URL: http://svn.apache.org/viewvc?rev=1410236&view=rev
Log:
Finish initial code for port of RSS connector to httpcomponents
Modified:
manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java
manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java
Modified: manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java?rev=1410236&r1=1410235&r2=1410236&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java (original)
+++ manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java Fri Nov 16 07:57:44 2012
@@ -67,7 +67,6 @@ public interface IThrottledConnection
* @return the status code: success, static error, or dynamic error.
*/
public int executeFetch(String protocol, int port, String urlPath, String userAgent, String from,
- String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
String lastETag, String lastModified)
throws ManifoldCFException, ServiceInterruption;
Modified: manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java?rev=1410236&r1=1410235&r2=1410236&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java Fri Nov 16 07:57:44 2012
@@ -974,7 +974,12 @@ public class RSSConnector extends org.ap
maxOpenConnectionsPerServer,
minimumMillisecondsPerFetchPerServer,
connectionLimit,
- feedTimeout);
+ feedTimeout,
+ proxyHost,
+ proxyPort,
+ proxyAuthDomain,
+ proxyAuthUsername,
+ proxyAuthPassword);
try
{
// Begin the fetch
@@ -983,8 +988,7 @@ public class RSSConnector extends org.ap
{
// Execute the request.
// Use the connect timeout from the document specification!
- int status = connection.executeFetch(protocol,port,pathPart,userAgent,from,proxyHost,proxyPort,
- proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,
+ int status = connection.executeFetch(protocol,port,pathPart,userAgent,from,
lastETagValue,lastModifiedValue);
switch (status)
{
Modified: manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java?rev=1410236&r1=1410235&r2=1410236&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java (original)
+++ manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java Fri Nov 16 07:57:44 2012
@@ -446,14 +446,15 @@ public class Robots
// Do the fetch
IThrottledConnection connection = fetcher.createConnection(hostName,minimumMillisecondsPerBytePerServer,
- maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS);
+ maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS,
+ proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
try
{
connection.beginFetch(ROBOT_CONNECTION_TYPE);
try
{
int responseCode = connection.executeFetch(protocol,port,ROBOT_FILE_NAME,userAgent,from,
- proxyHost, proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,null,null);
+ null,null);
switch (responseCode)
{
case IThrottledConnection.STATUS_OK:
Modified: manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java?rev=1410236&r1=1410235&r2=1410236&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java (original)
+++ manifoldcf/branches/CONNECTORS-120/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java Fri Nov 16 07:57:44 2012
@@ -19,6 +19,7 @@
package org.apache.manifoldcf.crawler.connectors.rss;
import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.common.XThreadInputStream;
import org.apache.manifoldcf.agents.interfaces.*;
import org.apache.manifoldcf.crawler.interfaces.*;
import org.apache.manifoldcf.crawler.system.Logging;
@@ -26,11 +27,34 @@ import org.apache.manifoldcf.crawler.sys
import java.util.*;
import java.io.*;
-import org.apache.commons.httpclient.*;
-import org.apache.commons.httpclient.methods.*;
-import org.apache.commons.httpclient.params.*;
-import org.apache.commons.httpclient.protocol.*;
-import org.apache.commons.httpclient.auth.*;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.client.HttpClient;
+import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.http.conn.ssl.BrowserCompatHostnameVerifier;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.NTCredentials;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.util.EntityUtils;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpParams;
+import org.apache.http.params.CoreConnectionPNames;
+import org.apache.http.HttpStatus;
+import org.apache.http.HttpHost;
+import org.apache.http.Header;
+import org.apache.http.conn.params.ConnRoutePNames;
+import org.apache.http.message.BasicHeader;
+
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.client.CircularRedirectException;
+import org.apache.http.NoHttpResponseException;
+import org.apache.http.HttpException;
/** This class uses httpclient to fetch stuff from webservers. However, it additionally controls the fetch
* rate in two ways: first, controlling the overall bandwidth used per server, and second, limiting the number
@@ -137,7 +161,8 @@ public class ThrottledFetcher
* @param connectionTimeoutMilliseconds is the number of milliseconds to wait for the connection before timing out.
*/
public synchronized IThrottledConnection createConnection(String serverName, double minimumMillisecondsPerBytePerServer,
- int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer, int connectionLimit, int connectionTimeoutMilliseconds)
+ int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer, int connectionLimit, int connectionTimeoutMilliseconds,
+ String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
throws ManifoldCFException, ServiceInterruption
{
Server server;
@@ -148,7 +173,9 @@ public class ThrottledFetcher
serverMap.put(serverName,server);
}
- return new ThrottledConnection(server,minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,connectionTimeoutMilliseconds,connectionLimit);
+ return new ThrottledConnection(server,minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,
+ connectionTimeoutMilliseconds,connectionLimit,
+ proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
}
/** Poll. This method is designed to allow idle connections to be closed and freed.
@@ -185,236 +212,6 @@ public class ThrottledFetcher
}
}
- // Where to record result info/file data
- protected final static String resultLogFile = "/common/rss/resultlog";
- // Where to record the actual data
- protected final static String dataFileFolder = "/common/rss/data/";
-
- // This is the one instance of the output class
- protected static DataRecorder dataRecorder = new DataRecorder();
-
- /** This class takes care of recording data and results for posterity */
- protected static class DataRecorder
- {
- protected int documentNumber = 0;
- protected long startTime = System.currentTimeMillis();
- protected boolean initialized = false;
-
- public DataRecorder()
- {
- }
-
- protected String readFile(File f)
- throws IOException
- {
- InputStream is = new FileInputStream(f);
- try
- {
- Reader r = new InputStreamReader(is);
- try
- {
- char[] characterBuf = new char[32];
- int amt = r.read(characterBuf);
- String rval = new String(characterBuf,0,amt);
- return rval;
- }
- finally
- {
- r.close();
- }
- }
- finally
- {
- is.close();
- }
- }
-
- protected void writeFile(File f, String data)
- throws IOException
- {
- OutputStream os = new FileOutputStream(f);
- try
- {
- Writer w = new OutputStreamWriter(os);
- try
- {
- w.write(data);
- }
- finally
- {
- w.flush();
- }
- }
- finally
- {
- os.close();
- }
- }
-
- protected synchronized void initializeParameters()
- {
- if (initialized)
- return;
-
- // Create folder, if it doesn't yet exist
- try
- {
- new File(dataFileFolder).mkdirs();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
-
- // Either read starting timestamp, or write it
- File timestampFile = new File("/common/rss/timestamp.log");
- if (timestampFile.exists())
- {
- try
- {
- String data = readFile(timestampFile);
- startTime = new Long(data).longValue();
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- else
- {
- try
- {
- writeFile(timestampFile,new Long(startTime).toString());
- }
- catch (IOException e)
- {
- e.printStackTrace();
- }
- }
-
- // Read starting document number, if it exists
- File documentNumberFile = new File("/common/rss/docnumber.log");
- if (documentNumberFile.exists())
- {
- try
- {
- String data = readFile(documentNumberFile);
- documentNumber = Integer.parseInt(data);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- initialized = true;
- }
-
- public DataSession getSession(String url)
- throws ManifoldCFException
- {
- initializeParameters();
- return new DataSession(this,url);
- }
-
- /** Atomically write resultlog record, returning data file name to use */
- public synchronized String writeResponseRecord(String url, int responseCode, ArrayList headerNames, ArrayList headerValues)
- throws ManifoldCFException
- {
- // Open log file
- try
- {
- OutputStream os = new FileOutputStream(resultLogFile,true);
- try
- {
- OutputStreamWriter writer = new OutputStreamWriter(os,"utf-8");
- try
- {
- String documentName = Integer.toString(documentNumber++);
- writer.write("Time: "+new Long(System.currentTimeMillis()-startTime).toString()+"\n");
- writer.write("URI: "+url+"\n");
- writer.write("File: "+documentName+"\n");
- writer.write("Code: "+Integer.toString(responseCode)+"\n");
- int i = 0;
- while (i < headerNames.size())
- {
- writer.write("Header: "+(String)headerNames.get(i)+":"+(String)headerValues.get(i)+"\n");
- i++;
- }
- writeFile(new File("/common/rss/docnumber.log"),new Integer(documentNumber).toString());
- return documentName;
- }
- finally
- {
- writer.close();
- }
- }
- finally
- {
- os.close();
- }
- }
- catch (IOException e)
- {
- throw new ManifoldCFException("Error recording file info: "+e.getMessage(),e);
- }
-
- }
-
-
- }
-
- /** Helper class for the above */
- protected static class DataSession
- {
- protected DataRecorder dr;
- protected String url;
- protected int responseCode = 0;
- protected ArrayList headerNames = new ArrayList();
- protected ArrayList headerValues = new ArrayList();
- protected String documentName = null;
-
- public DataSession(DataRecorder dr, String url)
- {
- this.dr = dr;
- this.url = url;
- }
-
- public void setResponseCode(int responseCode)
- {
- this.responseCode = responseCode;
- }
-
- public void addHeader(String headerName, String headerValue)
- {
- headerNames.add(headerName);
- headerValues.add(headerValue);
- }
-
- public void endHeader()
- throws ManifoldCFException
- {
- documentName = dr.writeResponseRecord(url,responseCode,headerNames,headerValues);
- }
-
- public void write(byte[] theBytes, int off, int length)
- throws IOException
- {
- if (documentName == null)
- throw new IOException("Must end header before reading data!");
- OutputStream os = new FileOutputStream(dataFileFolder+documentName,true);
- try
- {
- os.write(theBytes,off,length);
- }
- finally
- {
- os.close();
- }
- }
-
- }
-
/** This class represents an established connection to a URL.
*/
protected static class ThrottledConnection implements IThrottledConnection
@@ -428,7 +225,7 @@ public class ThrottledFetcher
/** The server object we use to track connections and fetches. */
protected Server server;
/** The method object */
- protected HttpMethodBase fetchMethod = null;
+ protected HttpRequestBase executeMethod = null;
/** The start-fetch time */
protected long startFetchTime = -1L;
/** The error trace, if any */
@@ -441,18 +238,23 @@ public class ThrottledFetcher
protected String fetchType = null;
/** The current bytes in the current fetch */
protected long fetchCounter = 0L;
- /** The connection pool (max size 1) */
- protected MultiThreadedHttpConnectionManager connectionManager = null;
/** Connection timeout in milliseconds */
protected int connectionTimeoutMilliseconds;
- /** Hack added to record all access data from current crawler */
- protected DataSession dataSession = null;
-
+ /** The thread that is actually doing the work */
+ protected ExecuteMethodThread methodThread = null;
+ /** Set if thread has been started */
+ protected boolean threadStarted = false;
+ /** The client connection manager */
+ protected ClientConnectionManager connectionManager = null;
+ /** The httpclient */
+ protected HttpClient httpClient = null;
+
/** Constructor.
*/
public ThrottledConnection(Server server, double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
- long minimumMillisecondsPerFetchPerServer, int connectionTimeoutMilliseconds, int connectionLimit)
+ long minimumMillisecondsPerFetchPerServer, int connectionTimeoutMilliseconds, int connectionLimit,
+ String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
throws ManifoldCFException
{
this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
@@ -460,11 +262,42 @@ public class ThrottledFetcher
this.minimumMillisecondsPerFetchPerServer = minimumMillisecondsPerFetchPerServer;
this.server = server;
this.connectionTimeoutMilliseconds = connectionTimeoutMilliseconds;
- connectionManager = new MultiThreadedHttpConnectionManager();
- HttpConnectionManagerParams httpConParam = connectionManager.getParams();
- httpConParam.setMaxTotalConnections(1);
- httpConParam.setConnectionTimeout(connectionTimeoutMilliseconds);
- httpConParam.setSoTimeout(connectionTimeoutMilliseconds);
+ PoolingClientConnectionManager localConnectionManager = new PoolingClientConnectionManager();
+ localConnectionManager.setMaxTotal(1);
+ connectionManager = localConnectionManager;
+
+ BasicHttpParams params = new BasicHttpParams();
+ params.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY,true);
+ params.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK,false);
+ params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT,connectionTimeoutMilliseconds);
+ params.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT,connectionTimeoutMilliseconds);
+ DefaultHttpClient localHttpClient = new DefaultHttpClient(connectionManager,params);
+ localHttpClient.setRedirectStrategy(new DefaultRedirectStrategy());
+
+ // If there's a proxy, set that too.
+ if (proxyHost != null && proxyHost.length() > 0)
+ {
+
+ // Configure proxy authentication
+ if (proxyAuthUsername != null && proxyAuthUsername.length() > 0)
+ {
+ if (proxyAuthPassword == null)
+ proxyAuthPassword = "";
+ if (proxyAuthDomain == null)
+ proxyAuthDomain = "";
+
+ localHttpClient.getCredentialsProvider().setCredentials(
+ new AuthScope(proxyHost, proxyPort),
+ new NTCredentials(proxyAuthUsername, proxyAuthPassword, currentHost, proxyAuthDomain));
+ }
+
+ HttpHost proxy = new HttpHost(proxyHost, proxyPort);
+
+ httpClient.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
+ }
+
+ httpClient = localHttpClient;
+
registerGlobalHandle(connectionLimit);
server.registerConnection(maxOpenConnectionsPerServer);
}
@@ -486,6 +319,7 @@ public class ThrottledFetcher
{
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
}
+ threadStarted = false;
}
/** Log the fetch of a number of bytes. */
@@ -494,46 +328,6 @@ public class ThrottledFetcher
fetchCounter += (long)count;
}
- protected static class ExecuteMethodThread extends Thread
- {
- protected HttpClient client;
- protected HostConfiguration hostConfiguration;
- protected HttpMethodBase executeMethod;
- protected Throwable exception = null;
- protected int rval = 0;
-
- public ExecuteMethodThread(HttpClient client, HostConfiguration hostConfiguration, HttpMethodBase executeMethod)
- {
- super();
- setDaemon(true);
- this.client = client;
- this.hostConfiguration = hostConfiguration;
- this.executeMethod = executeMethod;
- }
-
- public void run()
- {
- try
- {
- // Call the execute method appropriately
- rval = client.executeMethod(hostConfiguration,executeMethod);
- }
- catch (Throwable e)
- {
- this.exception = e;
- }
- }
-
- public Throwable getException()
- {
- return exception;
- }
-
- public int getResponse()
- {
- return rval;
- }
- }
/** Execute the fetch and get the return code. This method uses the
* standard logging mechanism to keep track of the fetch attempt. It also
@@ -556,7 +350,6 @@ public class ThrottledFetcher
* @return the status code: success, static error, or dynamic error.
*/
public int executeFetch(String protocol, int port, String urlPath, String userAgent, String from,
- String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
String lastETag, String lastModified)
throws ManifoldCFException, ServiceInterruption
{
@@ -568,75 +361,31 @@ public class ThrottledFetcher
sb.append(urlPath);
myUrl = sb.toString();
- if (recordEverything)
- // Start a new data session
- dataSession = dataRecorder.getSession(myUrl);
-
+ // Create the get method
+ executeMethod = new HttpGet(myUrl);
+
+ startFetchTime = System.currentTimeMillis();
+
+ // Set all appropriate headers
+ executeMethod.setHeader(new BasicHeader("User-Agent",userAgent));
+ executeMethod.setHeader(new BasicHeader("From",from));
+ if (lastETag != null)
+ executeMethod.setHeader(new BasicHeader("ETag",lastETag));
+ if (lastModified != null)
+ executeMethod.setHeader(new BasicHeader("Last-Modified",lastModified));
+ // Create the execution thread.
+ methodThread = new ExecuteMethodThread(this, server,
+ minimumMillisecondsPerBytePerServer, httpClient, executeMethod);
+ // Start the method thread, which will start the transaction
try
{
- HttpClient client = new HttpClient(connectionManager);
- // Permit circular redirections, because that is how some sites set cookies
- client.getParams().setParameter(org.apache.commons.httpclient.params.HttpClientParams.ALLOW_CIRCULAR_REDIRECTS,new Boolean(true));
- fetchMethod = new GetMethod();
- HostConfiguration config = new HostConfiguration();
-
- config.setHost(server.getServerName(),port,Protocol.getProtocol(protocol));
-
- // If there's a proxy, set that too.
- if (proxyHost != null && proxyHost.length() > 0)
- {
- config.setProxy(proxyHost,proxyPort);
- if (proxyAuthUsername != null && proxyAuthUsername.length() > 0)
- {
- if (proxyAuthPassword == null)
- proxyAuthPassword = "";
- if (proxyAuthDomain == null)
- proxyAuthDomain = "";
- // Set up NTLM credentials for this fetch too.
- client.getState().setProxyCredentials(AuthScope.ANY,
- new NTCredentials(proxyAuthUsername,proxyAuthPassword,currentHost,proxyAuthDomain));
- }
- }
-
- startFetchTime = System.currentTimeMillis();
- fetchMethod.setURI(new URI(urlPath,true));
-
- // Set all appropriate headers
- fetchMethod.setRequestHeader("User-Agent",userAgent);
- fetchMethod.setRequestHeader("From",from);
- if (lastETag != null)
- fetchMethod.setRequestHeader("ETag",lastETag);
- if (lastModified != null)
- fetchMethod.setRequestHeader("Last-Modified",lastModified);
-
- fetchMethod.getParams().setSoTimeout(connectionTimeoutMilliseconds);
- // fetchMethod.getParams().setIntParameter("http.socket.timeout", connectionTimeoutMilliseconds);
-
- fetchMethod.setFollowRedirects(true);
-
- // Fire it off!
+ methodThread.start();
+ threadStarted = true;
+ // We want to wait until at least the execution has fired, and then figure out where we
+ // stand
try
{
- ExecuteMethodThread t = new ExecuteMethodThread(client,config,fetchMethod);
- try
- {
- t.start();
- t.join();
- Throwable thr = t.getException();
- if (thr != null)
- {
- throw thr;
- }
- statusCode = t.getResponse();
- if (recordEverything)
- dataSession.setResponseCode(statusCode);
- }
- catch (InterruptedException e)
- {
- t.interrupt();
- // We need the caller to abandon any connections left around, so rethrow in a way that forces them to process the event properly.
- throw e;
- }
+ int statusCode = methodThread.getResponseCode();
long currentTime;
switch (statusCode)
{
@@ -668,116 +417,96 @@ public class ThrottledFetcher
default:
return STATUS_PAGEERROR;
}
-
- }
- catch (java.net.SocketTimeoutException e)
- {
- throwable = e;
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("Timed out waiting for IO for '"+myUrl+"': "+e.getMessage(), e, currentTime + 300000L,
- currentTime + 120L * 60000L,-1,false);
- }
- catch (org.apache.commons.httpclient.ConnectTimeoutException e)
- {
- throwable = e;
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("Timed out waiting for connect for '"+myUrl+"': "+e.getMessage(), e, currentTime + 60L * 60000L,
- currentTime + 720L * 60000L,-1,false);
- }
- catch (InterruptedIOException e)
- {
- throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
}
- catch (org.apache.commons.httpclient.CircularRedirectException e)
+ catch (InterruptedException e)
{
- throwable = e;
- statusCode = FETCH_CIRCULAR_REDIRECT;
- if (recordEverything)
- dataSession.setResponseCode(statusCode);
- return STATUS_PAGEERROR;
- }
- catch (org.apache.commons.httpclient.NoHttpResponseException e)
- {
- throwable = e;
- // Give up after 2 hours.
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("Timed out waiting for response for '"+myUrl+"'", e, currentTime + 15L * 60000L,
- currentTime + 120L * 60000L,-1,false);
- }
- catch (java.net.ConnectException e)
- {
- throwable = e;
- // Give up after 6 hours.
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("Timed out waiting for a connection for '"+myUrl+"'", e, currentTime + 1000000L,
- currentTime + 720L * 60000L,-1,false);
- }
- catch (java.net.NoRouteToHostException e)
- {
- // This exception means we know the IP address but can't get there. That's either a firewall issue, or it's something transient
- // with the network. Some degree of retry is probably wise.
- throwable = e;
- long currentTime = System.currentTimeMillis();
- throw new ServiceInterruption("No route to host for '"+myUrl+"'", e, currentTime + 1000000L,
- currentTime + 720L * 60000L,-1,false);
- }
- catch (IOException e)
- {
- // Treat this as a bad url. We don't know what happened, but it isn't something we are going to naively
- // retry on.
- throwable = e;
- statusCode = FETCH_IO_ERROR;
- if (recordEverything)
- dataSession.setResponseCode(statusCode);
- return STATUS_PAGEERROR;
+ methodThread.interrupt();
+ methodThread = null;
+ threadStarted = false;
+ throw e;
}
}
catch (InterruptedException e)
{
// Drop the current connection on the floor, so it cannot be reused.
- fetchMethod = null;
+ executeMethod = null;
throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
}
- catch (URIException e)
+ catch (java.net.MalformedURLException e)
{
throwable = new ManifoldCFException("Illegal URI: '"+myUrl+"'",e);
statusCode = FETCH_BAD_URI;
- if (recordEverything)
- dataSession.setResponseCode(statusCode);
return STATUS_PAGEERROR;
}
- catch (IllegalArgumentException e)
+ catch (java.net.SocketTimeoutException e)
{
- throwable = new ManifoldCFException("Illegal URI: '"+myUrl+"'",e);
- statusCode = FETCH_BAD_URI;
- if (recordEverything)
- dataSession.setResponseCode(statusCode);
- return STATUS_PAGEERROR;
+ throwable = e;
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for IO for '"+myUrl+"': "+e.getMessage(), e, currentTime + 300000L,
+ currentTime + 120L * 60000L,-1,false);
+ }
+ catch (ConnectTimeoutException e)
+ {
+ throwable = e;
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for connect for '"+myUrl+"': "+e.getMessage(), e, currentTime + 60L * 60000L,
+ currentTime + 720L * 60000L,-1,false);
}
- catch (IllegalStateException e)
+ catch (InterruptedIOException e)
{
- throwable = new ManifoldCFException("Illegal state while fetching URI: '"+myUrl+"'",e);
- statusCode = FETCH_SEQUENCE_ERROR;
- if (recordEverything)
- dataSession.setResponseCode(statusCode);
+ throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+ }
+ catch (CircularRedirectException e)
+ {
+ throwable = e;
+ statusCode = FETCH_CIRCULAR_REDIRECT;
return STATUS_PAGEERROR;
}
- catch (ServiceInterruption e)
+ catch (NoHttpResponseException e)
+ {
+ throwable = e;
+ // Give up after 2 hours.
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for response for '"+myUrl+"'", e, currentTime + 15L * 60000L,
+ currentTime + 120L * 60000L,-1,false);
+ }
+ catch (java.net.ConnectException e)
{
- throw e;
+ throwable = e;
+ // Give up after 6 hours.
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for a connection for '"+myUrl+"'", e, currentTime + 1000000L,
+ currentTime + 720L * 60000L,-1,false);
}
- catch (ManifoldCFException e)
+ catch (java.net.NoRouteToHostException e)
{
- throw e;
+ // This exception means we know the IP address but can't get there. That's either a firewall issue, or it's something transient
+ // with the network. Some degree of retry is probably wise.
+ throwable = e;
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("No route to host for '"+myUrl+"'", e, currentTime + 1000000L,
+ currentTime + 720L * 60000L,-1,false);
+ }
+ catch (HttpException e)
+ {
+ throwable = e;
+ statusCode = FETCH_IO_ERROR;
+ return STATUS_PAGEERROR;
+ }
+ catch (IOException e)
+ {
+ // Treat this as a bad url. We don't know what happened, but it isn't something we are going to naively
+ // retry on.
+ throwable = e;
+ statusCode = FETCH_IO_ERROR;
+ return STATUS_PAGEERROR;
}
catch (Throwable e)
{
Logging.connectors.debug("RSS: Caught an unexpected exception: "+e.getMessage(),e);
throwable = e;
statusCode = FETCH_UNKNOWN_ERROR;
- if (recordEverything)
- dataSession.setResponseCode(statusCode);
return STATUS_PAGEERROR;
}
}
@@ -797,24 +526,65 @@ public class ThrottledFetcher
public InputStream getResponseBodyStream()
throws ManifoldCFException, ServiceInterruption
{
- if (fetchMethod == null)
- throw new ManifoldCFException("Attempt to get a response when there is no method");
+ if (executeMethod == null)
+ throw new ManifoldCFException("Attempt to get an input stream when there is no method");
+ if (methodThread == null || threadStarted == false)
+ throw new ManifoldCFException("Attempt to get an input stream when no method thread");
try
{
- if (recordEverything)
- dataSession.endHeader();
- InputStream bodyStream = fetchMethod.getResponseBodyAsStream();
- if (bodyStream == null)
- throw new ManifoldCFException("Failed to set up body response stream");
- return new ThrottledInputstream(this,server,bodyStream,minimumMillisecondsPerBytePerServer,dataSession);
+ return methodThread.getSafeInputStream();
}
- catch (IOException e)
+ catch (InterruptedException e)
+ {
+ methodThread.interrupt();
+ throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+ }
+ catch (java.net.SocketTimeoutException e)
+ {
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for IO for '"+myUrl+"': "+e.getMessage(), e, currentTime + 300000L,
+ currentTime + 120L * 60000L,-1,false);
+ }
+ catch (ConnectTimeoutException e)
+ {
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for connect for '"+myUrl+"': "+e.getMessage(), e, currentTime + 60L * 60000L,
+ currentTime + 720L * 60000L,-1,false);
+ }
+ catch (InterruptedIOException e)
+ {
+ methodThread.interrupt();
+ throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+ }
+ catch (NoHttpResponseException e)
+ {
+ // Give up after 2 hours.
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for response for '"+myUrl+"'", e, currentTime + 15L * 60000L,
+ currentTime + 120L * 60000L,-1,false);
+ }
+ catch (java.net.ConnectException e)
+ {
+ // Give up after 6 hours.
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for a stream connection for '"+myUrl+"'", e, currentTime + 1000000L,
+ currentTime + 720L * 60000L,-1,false);
+ }
+ catch (java.net.NoRouteToHostException e)
+ {
+ // This exception means we know the IP address but can't get there. That's either a firewall issue, or it's something transient
+ // with the network. Some degree of retry is probably wise.
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("No route to host for '"+myUrl+"'", e, currentTime + 1000000L,
+ currentTime + 720L * 60000L,-1,false);
+ }
+ catch (HttpException e)
{
- throw new ManifoldCFException("IO exception setting up response stream",e);
+ throw new ManifoldCFException("Http exception reading stream: "+e.getMessage(),e);
}
- catch (IllegalStateException e)
+ catch (IOException e)
{
- throw new ManifoldCFException("State error getting response body",e);
+ throw new ManifoldCFException("I/O exception reading stream: "+e.getMessage(),e);
}
}
@@ -825,12 +595,66 @@ public class ThrottledFetcher
public String getResponseHeader(String headerName)
throws ManifoldCFException, ServiceInterruption
{
- Header h = fetchMethod.getResponseHeader(headerName);
- if (h == null)
- return null;
- if (recordEverything)
- dataSession.addHeader(headerName,h.getValue());
- return h.getValue();
+ if (executeMethod == null)
+ throw new ManifoldCFException("Attempt to get a header when there is no method");
+ if (methodThread == null || threadStarted == false)
+ throw new ManifoldCFException("Attempt to get a header when no method thread");
+ try
+ {
+ return methodThread.getFirstHeader(headerName);
+ }
+ catch (InterruptedException e)
+ {
+ methodThread.interrupt();
+ throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+ }
+ catch (java.net.SocketTimeoutException e)
+ {
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for IO for '"+myUrl+"': "+e.getMessage(), e, currentTime + 300000L,
+ currentTime + 120L * 60000L,-1,false);
+ }
+ catch (ConnectTimeoutException e)
+ {
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for connect for '"+myUrl+"': "+e.getMessage(), e, currentTime + 60L * 60000L,
+ currentTime + 720L * 60000L,-1,false);
+ }
+ catch (InterruptedIOException e)
+ {
+ methodThread.interrupt();
+ throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+ }
+ catch (NoHttpResponseException e)
+ {
+ // Give up after 2 hours.
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for response for '"+myUrl+"'", e, currentTime + 15L * 60000L,
+ currentTime + 120L * 60000L,-1,false);
+ }
+ catch (java.net.ConnectException e)
+ {
+ // Give up after 6 hours.
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("Timed out waiting for a connection for '"+myUrl+"'", e, currentTime + 1000000L,
+ currentTime + 720L * 60000L,-1,false);
+ }
+ catch (java.net.NoRouteToHostException e)
+ {
+ // This exception means we know the IP address but can't get there. That's either a firewall issue, or it's something transient
+ // with the network. Some degree of retry is probably wise.
+ long currentTime = System.currentTimeMillis();
+ throw new ServiceInterruption("No route to host for '"+myUrl+"'", e, currentTime + 1000000L,
+ currentTime + 720L * 60000L,-1,false);
+ }
+ catch (HttpException e)
+ {
+ throw new ManifoldCFException("Http exception reading response: "+e.getMessage(),e);
+ }
+ catch (IOException e)
+ {
+ throw new ManifoldCFException("I/O exception reading response: "+e.getMessage(),e);
+ }
}
/** Done with the fetch. Call this when the fetch has been completed. A log entry will be generated
@@ -839,8 +663,31 @@ public class ThrottledFetcher
public void doneFetch(IVersionActivity activities)
throws ManifoldCFException
{
+
if (fetchType != null)
{
+ // Abort the connection, if not already complete
+ try
+ {
+ methodThread.abort();
+ }
+ catch (InterruptedException e)
+ {
+ throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+ }
+ catch (InterruptedIOException e)
+ {
+ throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+ }
+ catch (HttpException e)
+ {
+ throw new ManifoldCFException("Unexpected Http exception: "+e.getMessage(),e);
+ }
+ catch (IOException e)
+ {
+ throw new ManifoldCFException("Unexpected IO exception: "+e.getMessage(),e);
+ }
+
long endTime = System.currentTimeMillis();
server.endFetch();
@@ -854,19 +701,31 @@ public class ThrottledFetcher
if (Logging.connectors.isDebugEnabled())
Logging.connectors.debug("RSS: Fetch exception for '"+myUrl+"'",throwable);
}
- // Clear out all the parameters
- if (fetchMethod != null)
+
+ // Shut down (join) the connection thread, if any, and if it started
+ if (threadStarted)
{
try
{
- fetchMethod.releaseConnection();
+ methodThread.finishUp();
+ }
+ catch (InterruptedException e)
+ {
+ throw new ManifoldCFException(e.getMessage(),e,ManifoldCFException.INTERRUPTED);
+ }
+ catch (HttpException e)
+ {
+ throw new ManifoldCFException("Unexpected HTTP exception: "+e.getMessage(),e);
}
- catch (IllegalStateException e)
+ catch (IOException e)
{
- // looks like the fetch method didn't have one, or it was already released. Just eat the exception.
+ throw new ManifoldCFException("Unexpected IO exception: "+e.getMessage(),e);
}
- fetchMethod = null;
+ threadStarted = false;
+ methodThread = null;
}
+
+ executeMethod = null;
throwable = null;
startFetchTime = -1L;
myUrl = null;
@@ -902,17 +761,14 @@ public class ThrottledFetcher
/** The stream we are wrapping. */
protected InputStream inputStream;
- protected DataSession dataSession;
-
/** Constructor.
*/
- public ThrottledInputstream(ThrottledConnection connection, Server server, InputStream is, double minimumMillisecondsPerBytePerServer, DataSession dataSession)
+ public ThrottledInputstream(ThrottledConnection connection, Server server, InputStream is, double minimumMillisecondsPerBytePerServer)
{
this.throttledConnection = connection;
this.server = server;
this.inputStream = is;
this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
- this.dataSession = dataSession;
}
/** Read a byte.
@@ -980,8 +836,6 @@ public class ThrottledFetcher
try
{
amt = inputStream.read(b,off,len);
- if (recordEverything && amt != -1)
- dataSession.write(b,off,amt);
return amt;
}
finally
@@ -1337,4 +1191,265 @@ public class ThrottledFetcher
}
+ /** This thread does the actual socket communication with the server.
+ * It's set up so that it can be abandoned at shutdown time.
+ *
+ * The way it works is as follows:
+ * - it starts the transaction
+ * - it receives the response, and saves that for the calling class to inspect
+ * - it transfers the data part to an input stream provided to the calling class
+ * - it shuts the connection down
+ *
+ * If there is an error, the sequence is aborted, and an exception is recorded
+ * for the calling class to examine.
+ *
+ * The calling class basically accepts the sequence above. It starts the
+ * thread, and tries to get a response code. If instead an exception is seen,
+ * the exception is thrown up the stack.
+ */
+ protected static class ExecuteMethodThread extends Thread
+ {
+ /** The connection */
+ protected final ThrottledConnection theConnection;
+ /** The connection bandwidth we want */
+ protected final double minimumMillisecondsPerBytePerServer;
+ /** The server object we use to track connections and fetches. */
+ protected final Server server;
+ /** Client and method, all preconfigured */
+ protected final HttpClient httpClient;
+ protected final HttpRequestBase executeMethod;
+
+ protected HttpResponse response = null;
+ protected Throwable responseException = null;
+ protected XThreadInputStream threadStream = null;
+ protected boolean threadCreated = false;
+ protected Throwable streamException = null;
+
+ protected Throwable shutdownException = null;
+
+ protected Throwable generalException = null;
+
+ public ExecuteMethodThread(ThrottledConnection theConnection, Server server,
+ double minimumMillisecondsPerBytePerServer,
+ HttpClient httpClient, HttpRequestBase executeMethod)
+ {
+ super();
+ setDaemon(true);
+ this.theConnection = theConnection;
+ this.server = server;
+ this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
+ this.httpClient = httpClient;
+ this.executeMethod = executeMethod;
+ }
+
+ public void run()
+ {
+ try
+ {
+ try
+ {
+ // Call the execute method appropriately
+ synchronized (this)
+ {
+ try
+ {
+ response = httpClient.execute(executeMethod);
+ }
+ catch (java.net.SocketTimeoutException e)
+ {
+ responseException = e;
+ }
+ catch (ConnectTimeoutException e)
+ {
+ responseException = e;
+ }
+ catch (InterruptedIOException e)
+ {
+ throw e;
+ }
+ catch (Throwable e)
+ {
+ responseException = e;
+ }
+ this.notifyAll();
+ }
+
+ // Start the transfer of the content
+ if (responseException == null)
+ {
+ synchronized (this)
+ {
+ try
+ {
+ InputStream bodyStream = response.getEntity().getContent();
+ if (bodyStream != null)
+ {
+ bodyStream = new ThrottledInputstream(theConnection,server,bodyStream,minimumMillisecondsPerBytePerServer);
+ threadStream = new XThreadInputStream(bodyStream);
+ }
+ threadCreated = true;
+ }
+ catch (java.net.SocketTimeoutException e)
+ {
+ streamException = e;
+ }
+ catch (ConnectTimeoutException e)
+ {
+ streamException = e;
+ }
+ catch (InterruptedIOException e)
+ {
+ throw e;
+ }
+ catch (Throwable e)
+ {
+ streamException = e;
+ }
+ this.notifyAll();
+ }
+ }
+
+ if (responseException == null && streamException == null)
+ {
+ if (threadStream != null)
+ {
+ // Stuff the content until we are done
+ threadStream.stuffQueue();
+ }
+ }
+
+ }
+ finally
+ {
+ synchronized (this)
+ {
+ try
+ {
+ executeMethod.abort();
+ }
+ catch (Throwable e)
+ {
+ shutdownException = e;
+ }
+ this.notifyAll();
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ // We catch exceptions here that should ONLY be InterruptedExceptions, as a result of the thread being aborted.
+ this.generalException = e;
+ }
+ }
+
+ public int getResponseCode()
+ throws InterruptedException, IOException, HttpException
+ {
+ // Must wait until the response object is there
+ while (true)
+ {
+ synchronized (this)
+ {
+ checkException(responseException);
+ if (response != null)
+ return response.getStatusLine().getStatusCode();
+ wait();
+ }
+ }
+ }
+
+ public String getFirstHeader(String headerName)
+ throws InterruptedException, IOException, HttpException
+ {
+ // Must wait for the response object to appear
+ while (true)
+ {
+ synchronized (this)
+ {
+ checkException(responseException);
+ if (response != null)
+ {
+ Header h = response.getFirstHeader(headerName);
+ if (h == null)
+ return null;
+ return h.getValue();
+ }
+ wait();
+ }
+ }
+ }
+
+ public InputStream getSafeInputStream()
+ throws InterruptedException, IOException, HttpException
+ {
+ // Must wait until stream is created, or until we note an exception was thrown.
+ while (true)
+ {
+ synchronized (this)
+ {
+ if (responseException != null)
+ throw new IllegalStateException("Check for response before getting stream");
+ checkException(streamException);
+ if (threadCreated)
+ return threadStream;
+ wait();
+ }
+ }
+ }
+
+ public void abort()
+ throws InterruptedException, IOException, HttpException
+ {
+ // This will be called during the stream access, either
+ // in addition to getSafeInputStream or in exchange.
+ // So we wait for the stream, and when we have it we
+ // kill it, and that will cause the whole thread to abort,
+ // if it isn't already done.
+ while (true)
+ {
+ synchronized (this)
+ {
+ if (responseException != null)
+ throw new IllegalStateException("Check for response before aborting stream");
+ checkException(streamException);
+ if (threadCreated)
+ {
+ if (threadStream != null)
+ threadStream.abort();
+ return;
+ }
+ wait();
+ }
+ }
+ }
+
+ public void finishUp()
+ throws InterruptedException, IOException, HttpException
+ {
+ join();
+ checkException(shutdownException);
+ }
+
+ protected synchronized void checkException(Throwable exception)
+ throws IOException, HttpException
+ {
+ if (exception != null)
+ {
+ // Throw the current exception, but clear it, so no further throwing is possible on the same problem.
+ Throwable e = exception;
+ if (e instanceof IOException)
+ throw (IOException)e;
+ else if (e instanceof HttpException)
+ throw (HttpException)e;
+ else if (e instanceof RuntimeException)
+ throw (RuntimeException)e;
+ else if (e instanceof Error)
+ throw (Error)e;
+ else
+ throw new RuntimeException("Unhandled exception of type: "+e.getClass().getName(),e);
+ }
+ }
+
+ }
+
}
Modified: manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java?rev=1410236&r1=1410235&r2=1410236&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java (original)
+++ manifoldcf/branches/CONNECTORS-120/framework/core/src/main/java/org/apache/manifoldcf/core/common/XThreadInputStream.java Fri Nov 16 07:57:44 2012
@@ -30,7 +30,9 @@ public class XThreadInputStream extends
private int startPoint = 0;
private int byteCount = 0;
private boolean streamEnd = false;
+ private IOException failureException = null;
private InputStream sourceStream;
+ private boolean abort = false;
/** Constructor */
public XThreadInputStream(InputStream sourceStream)
@@ -38,6 +40,17 @@ public class XThreadInputStream extends
this.sourceStream = sourceStream;
}
+ /** Call this method to abort the stuffQueue() method.
+ */
+ public void abort()
+ {
+ synchronized (this)
+ {
+ abort = true;
+ notifyAll();
+ }
+ }
+
/** This method is called from the helper thread side, to keep the queue
* stuffed. It exits when the stream is empty, or when interrupted.
*/
@@ -50,7 +63,7 @@ public class XThreadInputStream extends
int readStartPoint;
synchronized (this)
{
- if (streamEnd)
+ if (streamEnd || abort)
return;
// Calculate amount to read
maxToRead = buffer.length - byteCount;
@@ -65,10 +78,23 @@ public class XThreadInputStream extends
// See how to break up the reads into pieces. We only do one piece right now.
if (readStartPoint + maxToRead >= buffer.length)
maxToRead = buffer.length - readStartPoint;
- int amt = sourceStream.read(buffer, readStartPoint, maxToRead);
+
+ int amt = -1;
+ IOException exception = null;
+ try
+ {
+ amt = sourceStream.read(buffer, readStartPoint, maxToRead);
+ }
+ catch (IOException e)
+ {
+ exception = e;
+ }
+
synchronized (this)
{
- if (amt == -1)
+ if (exception != null)
+ failureException = exception;
+ else if (amt == -1)
streamEnd = true;
else
byteCount += amt;
@@ -129,6 +155,8 @@ public class XThreadInputStream extends
return totalAmt;
return -1;
}
+ if (failureException != null)
+ throw failureException;
wait();
continue;
}