You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2015/01/11 17:40:02 UTC
svn commit: r1650913 - in /manifoldcf/branches/dev_1x: ./ connectors/rss/
connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/
connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcra...
Author: kwright
Date: Sun Jan 11 16:40:01 2015
New Revision: 1650913
URL: http://svn.apache.org/r1650913
Log:
Pull up fix for CONNECTORS-1139 from trunk.
Added:
manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/AbortChecker.java
- copied, changed from r1650911, manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/AbortChecker.java
manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/BreakException.java (with props)
manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IBreakCheck.java (with props)
Modified:
manifoldcf/branches/dev_1x/ (props changed)
manifoldcf/branches/dev_1x/CHANGES.txt
manifoldcf/branches/dev_1x/connectors/rss/ (props changed)
manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java
manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java
manifoldcf/branches/dev_1x/framework/ (props changed)
manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
Propchange: manifoldcf/branches/dev_1x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Jan 11 16:40:01 2015
@@ -121,4 +121,4 @@
/manifoldcf/branches/CONNECTORS-981:1605049-1605773
/manifoldcf/branches/CONNECTORS-989:1611600-1612101
/manifoldcf/branches/CONNECTORS-990:1610284-1610707
-/manifoldcf/trunk:1620703,1620748,1620812,1620862,1621449,1621613,1621855,1622213,1622740,1622850,1622853-1622854,1623249,1623251,1623314,1623599,1623951,1623953-1623954,1623956,1623972,1624058,1624085,1624174,1624236,1624377,1624384,1624399,1624449,1624464,1624504,1624729-1624731,1624906,1624909-1624910,1624982,1625023,1625095,1625103,1625108,1625264,1625270,1625394,1625400,1625910,1626090,1626097,1626102,1626638-1626639,1626973,1627687,1627690,1627959,1628046,1628066,1628106,1628168,1628188,1628699,1628798,1628808,1628845,1628905,1629122,1629374-1629375,1629379,1629541,1629994,1630188,1630535,1630623,1630671,1630812,1630885,1631039,1631162,1631164,1631252,1631750,1631953,1632013,1632225,1632289,1632562,1632844,1632847,1632854,1633062-1633063,1633108,1633193,1633202,1633282,1633284,1633295,1633336,1633339,1633345,1633348,1633364,1633378,1633383,1633432,1633546,1633590,1633634,1633668,1633727,1633760,1633764,1633786,1633910,1633923,1634021,1634028,1634067,1634132,1634145,1634148,163
4155,1634188,1634202,1634264,1634373,1634530,1634688,1634850,1634857,1635103,1635116,1635421,1635438,1635478,1635481,1635484,1635490,1635809,1635939,1636146,1636167,1636180,1636207,1636215,1636232,1636334,1636519,1636570,1636684,1636940,1637011,1637310,1637350,1637364,1637373,1637378,1639259,1639593,1639600,1640018,1640101,1640199,1640204,1640208,1640314,1640319,1640749,1640772,1640805,1640888,1640925,1640941-1640942,1641222,1641328,1641557,1641559,1641629,1641633,1641724,1641754,1641911,1642163,1642255,1642318,1642531,1642650,1642658,1642673,1642716,1644197,1644399,1644538,1644920,1644931,1646317,1646397,1646403,1646408,1646640,1646947,1647574,1647585,1647608,1648686,1648976,1649201,1649203,1649529,1649605,1649628,1649794,1650351,1650741-1650742,1650745
+/manifoldcf/trunk:1620703,1620748,1620812,1620862,1621449,1621613,1621855,1622213,1622740,1622850,1622853-1622854,1623249,1623251,1623314,1623599,1623951,1623953-1623954,1623956,1623972,1624058,1624085,1624174,1624236,1624377,1624384,1624399,1624449,1624464,1624504,1624729-1624731,1624906,1624909-1624910,1624982,1625023,1625095,1625103,1625108,1625264,1625270,1625394,1625400,1625910,1626090,1626097,1626102,1626638-1626639,1626973,1627687,1627690,1627959,1628046,1628066,1628106,1628168,1628188,1628699,1628798,1628808,1628845,1628905,1629122,1629374-1629375,1629379,1629541,1629994,1630188,1630535,1630623,1630671,1630812,1630885,1631039,1631162,1631164,1631252,1631750,1631953,1632013,1632225,1632289,1632562,1632844,1632847,1632854,1633062-1633063,1633108,1633193,1633202,1633282,1633284,1633295,1633336,1633339,1633345,1633348,1633364,1633378,1633383,1633432,1633546,1633590,1633634,1633668,1633727,1633760,1633764,1633786,1633910,1633923,1634021,1634028,1634067,1634132,1634145,1634148,163
4155,1634188,1634202,1634264,1634373,1634530,1634688,1634850,1634857,1635103,1635116,1635421,1635438,1635478,1635481,1635484,1635490,1635809,1635939,1636146,1636167,1636180,1636207,1636215,1636232,1636334,1636519,1636570,1636684,1636940,1637011,1637310,1637350,1637364,1637373,1637378,1639259,1639593,1639600,1640018,1640101,1640199,1640204,1640208,1640314,1640319,1640749,1640772,1640805,1640888,1640925,1640941-1640942,1641222,1641328,1641557,1641559,1641629,1641633,1641724,1641754,1641911,1642163,1642255,1642318,1642531,1642650,1642658,1642673,1642716,1644197,1644399,1644538,1644920,1644931,1646317,1646397,1646403,1646408,1646640,1646947,1647574,1647585,1647608,1648686,1648976,1649201,1649203,1649529,1649605,1649628,1649794,1650351,1650741-1650742,1650745,1650911
Modified: manifoldcf/branches/dev_1x/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/CHANGES.txt?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/CHANGES.txt (original)
+++ manifoldcf/branches/dev_1x/CHANGES.txt Sun Jan 11 16:40:01 2015
@@ -3,6 +3,9 @@ $Id$
======================= 1.9-dev =====================
+CONNECTORS-1139: Add support for interruptible throttlers.
+(Karl Wright)
+
CONNECTORS-1138: Loss of all metadata fields but one on pipeline
bifurcation.
(Salih Sen, Karl Wright)
Propchange: manifoldcf/branches/dev_1x/connectors/rss/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Jan 11 16:40:01 2015
@@ -51,4 +51,4 @@
/manifoldcf/branches/CONNECTORS-912/connectors/rss:1579605-1582723
/manifoldcf/branches/CONNECTORS-962/connectors/rss:1602683-1603198
/manifoldcf/branches/CONNECTORS-990/connectors/rss:1610284-1610707
-/manifoldcf/trunk/connectors/rss:1621613,1621855,1622740,1622850,1624906,1628798,1633727,1633764,1634202
+/manifoldcf/trunk/connectors/rss:1621613,1621855,1622740,1622850,1624906,1628798,1633727,1633764,1634202,1650911
Modified: manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java (original)
+++ manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/IThrottledConnection.java Sun Jan 11 16:40:01 2015
@@ -51,7 +51,7 @@ public interface IThrottledConnection
* is used solely for logging purposes.
*/
public void beginFetch(String fetchType)
- throws ManifoldCFException;
+ throws ManifoldCFException, ServiceInterruption;
/** Execute the fetch and get the return code. This method uses the
* standard logging mechanism to keep track of the fetch attempt. It also
@@ -92,7 +92,7 @@ public interface IThrottledConnection
/** Done with the fetch. Call this when the fetch has been completed. A log entry will be generated
* describing what was done.
*/
- public void doneFetch(IVersionActivity activities)
+ public void doneFetch(IProcessActivity activities)
throws ManifoldCFException;
/** Close the connection. Call this to end this server connection.
Modified: manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java (original)
+++ manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java Sun Jan 11 16:40:01 2015
@@ -988,7 +988,8 @@ public class RSSConnector extends org.ap
proxyPort,
proxyAuthDomain,
proxyAuthUsername,
- proxyAuthPassword);
+ proxyAuthPassword,
+ activities);
try
{
// Begin the fetch
Modified: manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java (original)
+++ manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java Sun Jan 11 16:40:01 2015
@@ -444,7 +444,8 @@ public class Robots
// Do the fetch
IThrottledConnection connection = fetcher.createConnection(threadContext,throttleGroupName,
hostName,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS,
- proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+ proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,
+ activities);
try
{
connection.beginFetch(ROBOT_CONNECTION_TYPE);
Modified: manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java (original)
+++ manifoldcf/branches/dev_1x/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java Sun Jan 11 16:40:01 2015
@@ -155,7 +155,8 @@ public class ThrottledFetcher
*/
public synchronized IThrottledConnection createConnection(IThreadContext threadContext, String throttleGroupName,
String serverName, int connectionLimit, int connectionTimeoutMilliseconds,
- String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
+ String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
+ IAbortActivity activities)
throws ManifoldCFException, ServiceInterruption
{
IConnectionThrottler server;
@@ -170,7 +171,8 @@ public class ThrottledFetcher
return new ThrottledConnection(serverName, server,
connectionTimeoutMilliseconds,connectionLimit,
- proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+ proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,
+ activities);
}
/** Poll. This method is designed to allow idle connections to be closed and freed.
@@ -238,18 +240,23 @@ public class ThrottledFetcher
/** Set if thread has been started */
protected boolean threadStarted = false;
+ /** Abort checker */
+ protected final AbortChecker abortChecker;
+
/** Constructor.
*/
public ThrottledConnection(String serverName,
IConnectionThrottler connectionThrottler,
int connectionTimeoutMilliseconds, int connectionLimit,
- String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
- throws ManifoldCFException
+ String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
+ IAbortActivity activities)
+ throws ManifoldCFException, ServiceInterruption
{
this.serverName = serverName;
this.connectionThrottler = connectionThrottler;
this.connectionTimeoutMilliseconds = connectionTimeoutMilliseconds;
-
+ this.abortChecker = new AbortChecker(activities);
+
// Create the https scheme for this connection
javax.net.ssl.SSLSocketFactory httpsSocketFactory = KeystoreManagerFactory.getTrustingSecureSocketFactory();;
SSLConnectionSocketFactory myFactory = new SSLConnectionSocketFactory(new InterruptibleSocketFactory(httpsSocketFactory,connectionTimeoutMilliseconds),
@@ -307,35 +314,44 @@ public class ThrottledFetcher
registerGlobalHandle(connectionLimit);
try
{
- int result = connectionThrottler.waitConnectionAvailable();
+ int result = connectionThrottler.waitConnectionAvailable(abortChecker);
if (result != IConnectionThrottler.CONNECTION_FROM_CREATION)
throw new IllegalStateException("Got back unexpected value from waitForAConnection() of "+result);
- fetchThrottler = connectionThrottler.getNewConnectionFetchThrottler();
}
catch (InterruptedException e)
{
throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
}
+ catch (BreakException e)
+ {
+ abortChecker.rethrowExceptions();
+ }
+ fetchThrottler = connectionThrottler.getNewConnectionFetchThrottler();
}
/** Begin the fetch process.
* @param fetchType is a short descriptive string describing the kind of fetch being requested. This
* is used solely for logging purposes.
*/
+ @Override
public void beginFetch(String fetchType)
- throws ManifoldCFException
+ throws ManifoldCFException, ServiceInterruption
{
this.fetchType = fetchType;
fetchCounter = 0L;
try
{
- if (fetchThrottler.obtainFetchDocumentPermission() == false)
+ if (fetchThrottler.obtainFetchDocumentPermission(abortChecker) == false)
throw new IllegalStateException("obtainFetchDocumentPermission() had unexpected return value");
}
catch (InterruptedException e)
{
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
}
+ catch (BreakException e)
+ {
+ abortChecker.rethrowExceptions();
+ }
threadStarted = false;
}
@@ -366,6 +382,7 @@ public class ThrottledFetcher
* @param lastModified is the requested lastModified header value.
* @return the status code: success, static error, or dynamic error.
*/
+ @Override
public int executeFetch(String protocol, int port, String urlPath, String userAgent, String from,
String lastETag, String lastModified)
throws ManifoldCFException, ServiceInterruption
@@ -533,6 +550,7 @@ public class ThrottledFetcher
/** Get the http response code.
*@return the response code. This is either an HTTP response code, or one of the codes above.
*/
+ @Override
public int getResponseCode()
throws ManifoldCFException, ServiceInterruption
{
@@ -542,6 +560,7 @@ public class ThrottledFetcher
/** Get the response input stream. It is the responsibility of the caller
* to close this stream when done.
*/
+ @Override
public InputStream getResponseBodyStream()
throws ManifoldCFException, ServiceInterruption
{
@@ -611,6 +630,7 @@ public class ThrottledFetcher
*@param headerName is the name of the header.
*@return the header value, or null if it doesn't exist.
*/
+ @Override
public String getResponseHeader(String headerName)
throws ManifoldCFException, ServiceInterruption
{
@@ -679,7 +699,8 @@ public class ThrottledFetcher
/** Done with the fetch. Call this when the fetch has been completed. A log entry will be generated
* describing what was done.
*/
- public void doneFetch(IVersionActivity activities)
+ @Override
+ public void doneFetch(IProcessActivity activities)
throws ManifoldCFException
{
@@ -729,6 +750,7 @@ public class ThrottledFetcher
/** Close the connection. Call this to end this server connection.
*/
+ @Override
public void close()
throws ManifoldCFException
{
@@ -1171,5 +1193,51 @@ public class ThrottledFetcher
}
-
+ /** This class furnishes an abort signal whenever the job activity says it should.
+ * It should never be invoked from a background thread, only from a ManifoldCF thread.
+ */
+ protected static class AbortChecker implements IBreakCheck
+ {
+ protected final IAbortActivity activities;
+ protected ServiceInterruption serviceInterruption = null;
+ protected ManifoldCFException mcfException = null;
+
+ public AbortChecker(IAbortActivity activities)
+ {
+ this.activities = activities;
+ }
+
+ @Override
+ public long abortCheck()
+ throws BreakException, InterruptedException
+ {
+ try
+ {
+ activities.checkJobStillActive();
+ return 1000L;
+ }
+ catch (ServiceInterruption e)
+ {
+ serviceInterruption = e;
+ throw new BreakException("Break requested: "+e.getMessage(),e);
+ }
+ catch (ManifoldCFException e)
+ {
+ if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+ throw new InterruptedException("Interrupted: "+e.getMessage());
+ mcfException = e;
+ throw new BreakException("Error during break check: "+e.getMessage(),e);
+ }
+ }
+
+ public void rethrowExceptions()
+ throws ManifoldCFException, ServiceInterruption
+ {
+ if (serviceInterruption != null)
+ throw serviceInterruption;
+ if (mcfException != null)
+ throw mcfException;
+ }
+ }
+
}
Copied: manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/AbortChecker.java (from r1650911, manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/AbortChecker.java)
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/AbortChecker.java?p2=manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/AbortChecker.java&p1=manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/AbortChecker.java&r1=1650911&r2=1650913&rev=1650913&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/AbortChecker.java (original)
+++ manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/AbortChecker.java Sun Jan 11 16:40:01 2015
@@ -19,7 +19,6 @@
package org.apache.manifoldcf.crawler.connectors.webcrawler;
import org.apache.manifoldcf.core.interfaces.*;
-import org.apache.manifoldcf.connectorcommon.interfaces.*;
import org.apache.manifoldcf.agents.interfaces.*;
import org.apache.manifoldcf.crawler.interfaces.*;
Modified: manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java (original)
+++ manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java Sun Jan 11 16:40:01 2015
@@ -39,6 +39,10 @@ public interface IThrottledConnection
public static final int FETCH_INTERRUPTED = -104;
public static final int FETCH_UNKNOWN_ERROR = -999;
+ /** Set the abort checker. This must be done before the connection is actually used.
+ */
+ public void setAbortChecker(AbortChecker abortCheck);
+
/** Check whether the connection has expired.
*@param currentTime is the current time to use to judge if a connection has expired.
*@return true if the connection has expired, and should be closed.
@@ -50,7 +54,7 @@ public interface IThrottledConnection
* is used solely for logging purposes.
*/
public void beginFetch(String fetchType)
- throws ManifoldCFException;
+ throws ManifoldCFException, ServiceInterruption;
/** Execute the fetch and get the return code. This method uses the
* standard logging mechanism to keep track of the fetch attempt. It also
Modified: manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java (original)
+++ manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java Sun Jan 11 16:40:01 2015
@@ -181,8 +181,9 @@ public class ThrottledFetcher
IKeystoreManager trustStore,
IThrottleSpec throttleDescription, String[] binNames,
int connectionLimit,
- String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
- throws ManifoldCFException
+ String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
+ IAbortActivity activities)
+ throws ManifoldCFException, ServiceInterruption
{
// Get a throttle groups handle
IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
@@ -223,14 +224,7 @@ public class ThrottledFetcher
}
}
- try
- {
- return p.grab();
- }
- catch (InterruptedException e)
- {
- throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
- }
+ return p.grab(activities);
}
/** Flush connections that have timed out from inactivity. */
@@ -305,7 +299,9 @@ public class ThrottledFetcher
/** Set if thread has been started */
protected boolean threadStarted = false;
-
+ /** Abort checker */
+ protected AbortChecker abortCheck = null;
+
/** Constructor. Create a connection with a specific server and port, and
* register it as active against all bins. */
public ThrottledConnection(ConnectionPool myPool, IFetchThrottler fetchThrottler,
@@ -327,6 +323,14 @@ public class ThrottledFetcher
this.httpsSocketFactory = httpsSocketFactory;
}
+ /** Set the abort checker. This must be done before the connection is actually used.
+ */
+ @Override
+ public void setAbortChecker(AbortChecker abortCheck)
+ {
+ this.abortCheck = abortCheck;
+ }
+
/** Check whether the connection has expired.
*@param currentTime is the current time to use to judge if a connection has expired.
*@return true if the connection has expired, and should be closed.
@@ -368,19 +372,23 @@ public class ThrottledFetcher
*/
@Override
public void beginFetch(String fetchType)
- throws ManifoldCFException
+ throws ManifoldCFException, ServiceInterruption
{
this.fetchType = fetchType;
this.fetchCounter = 0L;
try
{
- if (fetchThrottler.obtainFetchDocumentPermission() == false)
+ if (fetchThrottler.obtainFetchDocumentPermission(abortCheck) == false)
throw new IllegalStateException("Unexpected return value from obtainFetchDocumentPermission()");
}
catch (InterruptedException e)
{
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
}
+ catch (BreakException e)
+ {
+ abortCheck.rethrowExceptions();
+ }
}
/** Execute the fetch and get the return code. This method uses the
@@ -1959,28 +1967,44 @@ public class ThrottledFetcher
this.proxyAuthPassword = proxyAuthPassword;
}
- public IThrottledConnection grab()
- throws InterruptedException
+ public IThrottledConnection grab(IAbortActivity activities)
+ throws ManifoldCFException, ServiceInterruption
{
- // Wait for a connection
- int result = connectionThrottler.waitConnectionAvailable();
- if (result == IConnectionThrottler.CONNECTION_FROM_POOL)
+ AbortChecker abortCheck = new AbortChecker(activities);
+ try
{
- // We are guaranteed to have a connection in the pool, unless there's a coding error.
- synchronized (connections)
+ // Wait for a connection
+ IThrottledConnection connection;
+ int result = connectionThrottler.waitConnectionAvailable(abortCheck);
+ if (result == IConnectionThrottler.CONNECTION_FROM_POOL)
+ {
+ // We are guaranteed to have a connection in the pool, unless there's a coding error.
+ synchronized (connections)
+ {
+ connection = connections.remove(connections.size()-1);
+ }
+ }
+ else if (result == IConnectionThrottler.CONNECTION_FROM_CREATION)
{
- return connections.remove(connections.size()-1);
+ connection = new ThrottledConnection(this,connectionThrottler.getNewConnectionFetchThrottler(),
+ protocol,server,port,authentication,baseFactory,
+ proxyHost,proxyPort,
+ proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
}
+ else
+ throw new IllegalStateException("Unexpected return value from waitConnectionAvailable(): "+result);
+ connection.setAbortChecker(abortCheck);
+ return connection;
}
- else if (result == IConnectionThrottler.CONNECTION_FROM_CREATION)
+ catch (InterruptedException e)
{
- return new ThrottledConnection(this,connectionThrottler.getNewConnectionFetchThrottler(),
- protocol,server,port,authentication,baseFactory,
- proxyHost,proxyPort,
- proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+ throw new ManifoldCFException("Interrupted: "+e.getMessage(),ManifoldCFException.INTERRUPTED);
+ }
+ catch (BreakException e)
+ {
+ abortCheck.rethrowExceptions();
+ return null;
}
- else
- throw new IllegalStateException("Unexpected return value from waitConnectionAvailable(): "+result);
}
public void release(IThrottledConnection connection)
@@ -1994,6 +2018,7 @@ public class ThrottledFetcher
else
{
// Return to pool
+ connection.setAbortChecker(null);
synchronized (connections)
{
connections.add(connection);
Modified: manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java (original)
+++ manifoldcf/branches/dev_1x/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java Sun Jan 11 16:40:01 2015
@@ -887,7 +887,8 @@ public class WebcrawlerConnector extends
throttleGroupName,
protocol,ipAddress,port,
credential,trustStore,throttleDescription,binNames,connectionLimit,
- proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+ proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,
+ activities);
try
{
connection.beginFetch((fetchStatus.sessionState == SESSIONSTATE_LOGIN)?FETCH_LOGIN:FETCH_STANDARD);
@@ -5604,7 +5605,8 @@ public class WebcrawlerConnector extends
IThrottledConnection connection = ThrottledFetcher.getConnection(currentContext,throttleGroupName,
protocol,hostIPAddress,port,credential,
trustStore,throttleDescription,binNames,connectionLimit,
- proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+ proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,
+ versionActivities);
try
{
connection.beginFetch(FETCH_ROBOTS);
Propchange: manifoldcf/branches/dev_1x/framework/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Jan 11 16:40:01 2015
@@ -113,4 +113,4 @@
/manifoldcf/branches/CONNECTORS-989/framework:1611600-1612101
/manifoldcf/branches/CONNECTORS-990/framework:1610284-1610707
/manifoldcf/trunk:1629122
-/manifoldcf/trunk/framework:1620703,1620748,1620812,1620862,1621449,1621613,1621855,1622213,1622740,1622850,1622853-1622854,1623249,1623251,1623314,1623599,1623951,1623953-1623954,1623956,1623972,1624058,1624085,1624174,1624236,1624377,1624384,1624399,1624449,1624464,1624504,1624729-1624731,1624906,1624909-1624910,1624982,1625023,1625095,1625103,1625108,1625264,1625270,1625394,1625400,1625910,1626090,1626097,1626102,1626638-1626639,1626973,1627687,1627690,1627959,1628046,1628066,1628106,1628168,1628188,1628699,1628798,1628808,1628845,1628905,1629122,1629374-1629375,1629379,1629541,1629994,1630188,1630535,1630623,1630671,1630812,1630885,1631039,1631162,1631164,1631252,1632013,1632289,1632844,1633108,1633193,1633202,1633348,1633364,1634145,1634148,1634155,1634264,1634373,1634530,1635438,1635809,1636146,1636180,1636207,1636232,1636334,1636519,1636570,1636684,1636940,1637011,1639593,1639600,1640018,1640101,1640199,1640314,1640319,1640749,1640772,1640925,1640941,1641222,1641557,1641559,1
641724,1641911,1642163,1642255,1642318,1644197,1644399,1646317,1646397,1646403,1646640,1647574,1647585,1647608,1649605,1650351
+/manifoldcf/trunk/framework:1620703,1620748,1620812,1620862,1621449,1621613,1621855,1622213,1622740,1622850,1622853-1622854,1623249,1623251,1623314,1623599,1623951,1623953-1623954,1623956,1623972,1624058,1624085,1624174,1624236,1624377,1624384,1624399,1624449,1624464,1624504,1624729-1624731,1624906,1624909-1624910,1624982,1625023,1625095,1625103,1625108,1625264,1625270,1625394,1625400,1625910,1626090,1626097,1626102,1626638-1626639,1626973,1627687,1627690,1627959,1628046,1628066,1628106,1628168,1628188,1628699,1628798,1628808,1628845,1628905,1629122,1629374-1629375,1629379,1629541,1629994,1630188,1630535,1630623,1630671,1630812,1630885,1631039,1631162,1631164,1631252,1632013,1632289,1632844,1633108,1633193,1633202,1633348,1633364,1634145,1634148,1634155,1634264,1634373,1634530,1635438,1635809,1636146,1636180,1636207,1636232,1636334,1636519,1636570,1636684,1636940,1637011,1639593,1639600,1640018,1640101,1640199,1640314,1640319,1640749,1640772,1640925,1640941,1641222,1641557,1641559,1
641724,1641911,1642163,1642255,1642318,1644197,1644399,1646317,1646397,1646403,1646640,1647574,1647585,1647608,1649605,1650351,1650911
Added: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/BreakException.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/BreakException.java?rev=1650913&view=auto
==============================================================================
--- manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/BreakException.java (added)
+++ manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/BreakException.java Sun Jan 11 16:40:01 2015
@@ -0,0 +1,39 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+/** Exception signalling that a IBreakCheck abort check caused an abort.
+*/
+public class BreakException extends Exception
+{
+ public BreakException()
+ {
+ super();
+ }
+
+ public BreakException(String msg)
+ {
+ super(msg);
+ }
+
+ public BreakException(String msg, Throwable e)
+ {
+ super(msg,e);
+ }
+}
Propchange: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/BreakException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/BreakException.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IBreakCheck.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IBreakCheck.java?rev=1650913&view=auto
==============================================================================
--- manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IBreakCheck.java (added)
+++ manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IBreakCheck.java Sun Jan 11 16:40:01 2015
@@ -0,0 +1,32 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.interfaces;
+
+/** Allow for abort checks during long waits.
+*/
+public interface IBreakCheck
+{
+ /** Obtain the maximum time (in milliseconds) that a wait should occur
+ * before checking for an abort. Also check for the abort and throw a BreakException
+ * if the abort should take place.
+ */
+ public long abortCheck()
+ throws BreakException, InterruptedException;
+
+}
Propchange: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IBreakCheck.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IBreakCheck.java
------------------------------------------------------------------------------
svn:keywords = Id
Modified: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java (original)
+++ manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java Sun Jan 11 16:40:01 2015
@@ -58,7 +58,17 @@ public interface IConnectionThrottler
*/
public int waitConnectionAvailable()
throws InterruptedException;
-
+
+ /** Get permission to grab a connection for use. If this object believes there is a connection
+ * available in the pool, it will update its pool size variable and return If not, this method
+ * evaluates whether a new connection should be created. If neither condition is true, it
+ * waits until a connection is available.
+ *@return whether to take the connection from the pool, or create one, or whether the
+ * throttler is being shut down.
+ */
+ public int waitConnectionAvailable(IBreakCheck breakCheck)
+ throws InterruptedException, BreakException;
+
/** For a new connection, obtain the fetch throttler to use for the connection.
* If the result from waitConnectionAvailable() is CONNECTION_FROM_CREATION,
* the calling code is expected to create a connection using the result of this method.
Modified: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java (original)
+++ manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java Sun Jan 11 16:40:01 2015
@@ -35,6 +35,14 @@ public interface IFetchThrottler
*/
public boolean obtainFetchDocumentPermission()
throws InterruptedException;
+
+ /** Get permission to fetch a document. This grants permission to start
+ * fetching a single document, within the connection that has already been
+ * granted permission that created this object.
+ *@return false if the throttler is being shut down.
+ */
+ public boolean obtainFetchDocumentPermission(IBreakCheck breakCheck)
+ throws InterruptedException, BreakException;
/** Open a fetch stream. When done (or aborting), call
* IStreamThrottler.closeStream() to note the completion of the document
Modified: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java (original)
+++ manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java Sun Jan 11 16:40:01 2015
@@ -36,7 +36,16 @@ public interface IStreamThrottler
*/
public boolean obtainReadPermission(int byteCount)
throws InterruptedException;
-
+
+ /** Obtain permission to read a block of bytes. This method may wait until it is OK to proceed.
+ * The throttle group, bin names, etc are already known
+ * to this specific interface object, so it is unnecessary to include them here.
+ *@param byteCount is the number of bytes to get permissions to read.
+ *@return true if the wait took place as planned, or false if the system is being shut down.
+ */
+ public boolean obtainReadPermission(int byteCount, IBreakCheck breakCheck)
+ throws InterruptedException, BreakException;
+
/** Note the completion of the read of a block of bytes. Call this after
* obtainReadPermission() was successfully called, and bytes were successfully read.
*@param origByteCount is the originally requested number of bytes to get permissions to read.
Modified: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (original)
+++ manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java Sun Jan 11 16:40:01 2015
@@ -123,8 +123,8 @@ public class ConnectionBin
* subsequent call to noteConnectionCreation() will be needed to confirm the reservation, or clearReservation() to
* release the reservation.
*/
- public synchronized int waitConnectionAvailable(AtomicInteger poolCount)
- throws InterruptedException
+ public synchronized int waitConnectionAvailable(AtomicInteger poolCount, IBreakCheck breakCheck)
+ throws InterruptedException, BreakException
{
// Reserved connections keep a slot available which can't be used by anyone else.
// Connection bins are always sorted so that deadlocks can't occur.
@@ -150,7 +150,16 @@ public class ConnectionBin
return IConnectionThrottler.CONNECTION_FROM_CREATION;
}
// Wait for a connection to free up. Note that it is up to the caller to free stuff up.
- wait();
+ if (breakCheck == null)
+ {
+ wait();
+ }
+ else
+ {
+ long amt = breakCheck.abortCheck();
+ wait(amt);
+ }
+ // Back around
}
}
Modified: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (original)
+++ manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Sun Jan 11 16:40:01 2015
@@ -99,8 +99,8 @@ public class FetchBin
* has permission to do the fetch, and can update the last fetch time.
*@return false if the fetch bin is being shut down.
*/
- public synchronized boolean reserveFetchRequest()
- throws InterruptedException
+ public synchronized boolean reserveFetchRequest(IBreakCheck breakCheck)
+ throws InterruptedException, BreakException
{
// First wait for the ability to even get the next fetch from this bin
while (true)
@@ -112,7 +112,15 @@ public class FetchBin
reserveNextFetch = true;
return true;
}
- wait();
+ if (breakCheck == null)
+ {
+ wait();
+ }
+ else
+ {
+ long amt = breakCheck.abortCheck();
+ wait(amt);
+ }
}
}
@@ -130,9 +138,10 @@ public class FetchBin
* rights already, via reserveFetchRequest().
*@return false if the wait did not complete because the bin was shut down.
*/
- public synchronized boolean waitNextFetch()
- throws InterruptedException
+ public synchronized boolean waitNextFetch(IBreakCheck breakCheck)
+ throws InterruptedException, BreakException
{
+ // MHL
if (!reserveNextFetch)
throw new IllegalStateException("No fetch request reserved!");
@@ -144,7 +153,16 @@ public class FetchBin
if (localMinimum == Long.MAX_VALUE)
{
// wait forever - but eventually someone will set a smaller interval and wake us up.
- wait();
+ if (breakCheck == null)
+ {
+ wait();
+ }
+ else
+ {
+ long amt = breakCheck.abortCheck();
+ wait(amt);
+ }
+ // Back around
}
else
{
@@ -160,11 +178,22 @@ public class FetchBin
notifyAll();
return true;
}
- wait(waitAmt);
+ if (breakCheck == null)
+ {
+ wait(waitAmt);
+ }
+ else
+ {
+ long amt = breakCheck.abortCheck();
+ if (waitAmt < amt)
+ amt = waitAmt;
+ wait(amt);
+ }
+ // Back around
}
}
}
-
+
/** Poll this bin */
public synchronized void poll(IThreadContext threadContext)
throws ManifoldCFException
Modified: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Sun Jan 11 16:40:01 2015
@@ -173,10 +173,9 @@ public class ThrottleBin
* read request takes place. Performs the necessary delay prior to reading specified number of bytes from the server.
*@return false if the wait was interrupted due to the bin being shut down.
*/
- public boolean beginRead(int byteCount)
- throws InterruptedException
+ public boolean beginRead(int byteCount, IBreakCheck breakCheck)
+ throws InterruptedException, BreakException
{
-
synchronized (this)
{
while (true)
@@ -185,7 +184,15 @@ public class ThrottleBin
return false;
if (estimateInProgress)
{
- wait();
+ if (breakCheck == null)
+ {
+ wait();
+ }
+ else
+ {
+ long amt = breakCheck.abortCheck();
+ wait(amt);
+ }
continue;
}
@@ -205,7 +212,15 @@ public class ThrottleBin
// If we haven't set a proper throttle yet, wait until we do.
if (localMinimum == Double.MAX_VALUE)
{
- wait();
+ if (breakCheck == null)
+ {
+ wait();
+ }
+ else
+ {
+ long amt = breakCheck.abortCheck();
+ wait(amt);
+ }
continue;
}
@@ -228,7 +243,17 @@ public class ThrottleBin
return true;
}
- this.wait(waitTime);
+ if (breakCheck == null)
+ {
+ this.wait(waitTime);
+ }
+ else
+ {
+ long amt = breakCheck.abortCheck();
+ if (waitTime < amt)
+ amt = waitTime;
+ wait(amt);
+ }
// Back around again...
}
}
Modified: manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java?rev=1650913&r1=1650912&r2=1650913&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java (original)
+++ manifoldcf/branches/dev_1x/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java Sun Jan 11 16:40:01 2015
@@ -421,8 +421,8 @@ public class Throttler
* are available in the current pool, across all bins.
*@return the IConnectionThrottler codes for results.
*/
- public int waitConnectionAvailable(String[] binNames, AtomicInteger[] poolCounts)
- throws InterruptedException
+ public int waitConnectionAvailable(String[] binNames, AtomicInteger[] poolCounts, IBreakCheck breakCheck)
+ throws InterruptedException, BreakException
{
// Each bin can signal something different. Bins that signal
// CONNECTION_FROM_NOWHERE are shutting down, but there's also
@@ -456,7 +456,7 @@ public class Throttler
int result;
try
{
- result = bin.waitConnectionAvailable(poolCounts[i]);
+ result = bin.waitConnectionAvailable(poolCounts[i],breakCheck);
}
catch (Throwable e)
{
@@ -471,6 +471,8 @@ public class Throttler
if (bin != null)
bin.undoReservation(currentRecommendation, poolCounts[i]);
}
+ if (e instanceof BreakException)
+ throw (BreakException)e;
if (e instanceof InterruptedException)
throw (InterruptedException)e;
if (e instanceof Error)
@@ -700,8 +702,8 @@ public class Throttler
*@param binNames are the names of the bins.
*@return false if being shut down
*/
- public boolean obtainFetchDocumentPermission(String[] binNames)
- throws InterruptedException
+ public boolean obtainFetchDocumentPermission(String[] binNames, IBreakCheck breakCheck)
+ throws InterruptedException, BreakException
{
// First, make sure all the bins exist, and reserve a slot in each
int i = 0;
@@ -714,9 +716,28 @@ public class Throttler
bin = fetchBins.get(binName);
}
// Reserve a slot
- if (bin == null || !bin.reserveFetchRequest())
+ try
{
- // Release previous reservations, and return null
+ if (bin == null || !bin.reserveFetchRequest(breakCheck))
+ {
+ // Release previous reservations, and return null
+ while (i > 0)
+ {
+ i--;
+ binName = binNames[i];
+ synchronized (fetchBins)
+ {
+ bin = fetchBins.get(binName);
+ }
+ if (bin != null)
+ bin.clearReservation();
+ }
+ return false;
+ }
+ }
+ catch (BreakException e)
+ {
+ // Release previous reservations, and rethrow
while (i > 0)
{
i--;
@@ -728,7 +749,7 @@ public class Throttler
if (bin != null)
bin.clearReservation();
}
- return false;
+ throw e;
}
i++;
}
@@ -746,7 +767,26 @@ public class Throttler
}
if (bin != null)
{
- if (!bin.waitNextFetch())
+ try
+ {
+ if (!bin.waitNextFetch(breakCheck))
+ {
+ // Undo the reservations we haven't processed yet
+ while (i < binNames.length)
+ {
+ binName = binNames[i];
+ synchronized (fetchBins)
+ {
+ bin = fetchBins.get(binName);
+ }
+ if (bin != null)
+ bin.clearReservation();
+ i++;
+ }
+ return false;
+ }
+ }
+ catch (BreakException e)
{
// Undo the reservations we haven't processed yet
while (i < binNames.length)
@@ -760,7 +800,7 @@ public class Throttler
bin.clearReservation();
i++;
}
- return false;
+ throw e;
}
}
i++;
@@ -792,8 +832,8 @@ public class Throttler
*@param byteCount is the number of bytes to get permissions to read.
*@return true if the wait took place as planned, or false if the system is being shut down.
*/
- public boolean obtainReadPermission(String[] binNames, int byteCount)
- throws InterruptedException
+ public boolean obtainReadPermission(String[] binNames, int byteCount, IBreakCheck breakCheck)
+ throws InterruptedException, BreakException
{
int i = 0;
while (i < binNames.length)
@@ -804,7 +844,26 @@ public class Throttler
{
bin = throttleBins.get(binName);
}
- if (bin == null || !bin.beginRead(byteCount))
+ try
+ {
+ if (bin == null || !bin.beginRead(byteCount, breakCheck))
+ {
+ // End bins we've already done, and exit
+ while (i > 0)
+ {
+ i--;
+ binName = binNames[i];
+ synchronized (throttleBins)
+ {
+ bin = throttleBins.get(binName);
+ }
+ if (bin != null)
+ bin.endRead(byteCount,0);
+ }
+ return false;
+ }
+ }
+ catch (BreakException e)
{
// End bins we've already done, and exit
while (i > 0)
@@ -818,7 +877,7 @@ public class Throttler
if (bin != null)
bin.endRead(byteCount,0);
}
- return false;
+ throw e;
}
i++;
}
@@ -988,7 +1047,28 @@ public class Throttler
public int waitConnectionAvailable()
throws InterruptedException
{
- return parent.waitConnectionAvailable(binNames, poolCounts);
+ try
+ {
+ return waitConnectionAvailable(null);
+ }
+ catch (BreakException e)
+ {
+ throw new RuntimeException("Unexpected break exception: "+e.getMessage(),e);
+ }
+ }
+
+ /** Get permission to grab a connection for use. If this object believes there is a connection
+ * available in the pool, it will update its pool size variable and return If not, this method
+ * evaluates whether a new connection should be created. If neither condition is true, it
+ * waits until a connection is available.
+ *@return whether to take the connection from the pool, or create one, or whether the
+ * throttler is being shut down.
+ */
+ @Override
+ public int waitConnectionAvailable(IBreakCheck breakCheck)
+ throws InterruptedException, BreakException
+ {
+ return parent.waitConnectionAvailable(binNames, poolCounts, breakCheck);
}
/** For a new connection, obtain the fetch throttler to use for the connection.
@@ -1087,7 +1167,26 @@ public class Throttler
public boolean obtainFetchDocumentPermission()
throws InterruptedException
{
- return parent.obtainFetchDocumentPermission(binNames);
+ try
+ {
+ return obtainFetchDocumentPermission(null);
+ }
+ catch (BreakException e)
+ {
+ throw new RuntimeException("Unexpected break exception: "+e.getMessage(),e);
+ }
+ }
+
+ /** Get permission to fetch a document. This grants permission to start
+ * fetching a single document, within the connection that has already been
+ * granted permission that created this object.
+ *@return false if the throttler is being shut down.
+ */
+ @Override
+ public boolean obtainFetchDocumentPermission(IBreakCheck breakCheck)
+ throws InterruptedException, BreakException
+ {
+ return parent.obtainFetchDocumentPermission(binNames,breakCheck);
}
/** Open a fetch stream. When done (or aborting), call
@@ -1127,9 +1226,30 @@ public class Throttler
public boolean obtainReadPermission(int byteCount)
throws InterruptedException
{
- return parent.obtainReadPermission(binNames, byteCount);
+ try
+ {
+ return obtainReadPermission(byteCount, null);
+ }
+ catch (BreakException e)
+ {
+ throw new RuntimeException("Unexpected break exception: "+e.getMessage(),e);
+ }
}
+ /** Obtain permission to read a block of bytes. This method may wait until it is OK to proceed.
+ * The throttle group, bin names, etc are already known
+ * to this specific interface object, so it is unnecessary to include them here.
+ *@param byteCount is the number of bytes to get permissions to read.
+ *@param breakCheck is the break check object.
+ *@return true if the wait took place as planned, or false if the system is being shut down.
+ */
+ @Override
+ public boolean obtainReadPermission(int byteCount, IBreakCheck breakCheck)
+ throws InterruptedException, BreakException
+ {
+ return parent.obtainReadPermission(binNames, byteCount, breakCheck);
+ }
+
/** Note the completion of the read of a block of bytes. Call this after
* obtainReadPermission() was successfully called, and bytes were successfully read.
*@param origByteCount is the originally requested number of bytes to get permissions to read.