You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/17 22:12:43 UTC
svn commit: r1551707 [1/2] - in /manifoldcf/trunk: ./ connectors/rss/
connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/
connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler...
Author: kwright
Date: Tue Dec 17 21:12:42 2013
New Revision: 1551707
URL: http://svn.apache.org/r1551707
Log:
Part of CONNECTORS-829. Refactor RSS and Web connectors to use a common subsystem in core.
Added:
manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
- copied unchanged from r1551705, manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
- copied unchanged from r1551705, manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
- copied unchanged from r1551705, manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleGroups.java
- copied unchanged from r1551705, manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleGroups.java
manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java
- copied unchanged from r1551705, manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IThrottleSpec.java
manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ThrottleGroupsFactory.java
- copied unchanged from r1551705, manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/ThrottleGroupsFactory.java
manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/
- copied from r1551705, manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/
manifoldcf/trunk/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/
- copied from r1551705, manifoldcf/branches/CONNECTORS-829/framework/core/src/test/java/org/apache/manifoldcf/core/throttler/
Modified:
manifoldcf/trunk/ (props changed)
manifoldcf/trunk/connectors/rss/ (props changed)
manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java
manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java
manifoldcf/trunk/framework/build.xml
manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java
Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
Merged /manifoldcf/branches/CONNECTORS-829:r1550233-1551705
Propchange: manifoldcf/trunk/connectors/rss/
------------------------------------------------------------------------------
Merged /manifoldcf/branches/CONNECTORS-829/connectors/rss:r1550233-1551705
Modified: manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java (original)
+++ manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/RSSConnector.java Tue Dec 17 21:12:42 2013
@@ -52,7 +52,7 @@ public class RSSConnector extends org.ap
{
public static final String _rcsid = "@(#)$Id: RSSConnector.java 994959 2010-09-08 10:04:42Z kwright $";
-
+ protected final static String rssThrottleGroupType = "_RSS_";
// Usage flag values
protected static final int ROBOTS_NONE = 0;
@@ -105,7 +105,7 @@ public class RSSConnector extends org.ap
protected Robots robots = null;
/** Storage for fetcher objects */
- protected static Map fetcherMap = new HashMap();
+ protected static Map<String,ThrottledFetcher> fetcherMap = new HashMap<String,ThrottledFetcher>();
/** Storage for robots objects */
protected static Map robotsMap = new HashMap();
@@ -231,10 +231,16 @@ public class RSSConnector extends org.ap
}
+ IThrottleGroups tg = ThrottleGroupsFactory.make(currentContext);
+ // Create the throttle group
+ tg.createOrUpdateThrottleGroup(rssThrottleGroupType, throttleGroupName, new ThrottleSpec(maxOpenConnectionsPerServer,
+ minimumMillisecondsPerFetchPerServer, minimumMillisecondsPerBytePerServer));
+
isInitialized = true;
}
}
+
/** Return the list of activities that this connector supports (i.e. writes into the log).
*@return the list.
*/
@@ -936,11 +942,9 @@ public class RSSConnector extends org.ap
String pathPart = url.getFile();
// Check with robots to see if it's allowed
- if (robotsUsage >= ROBOTS_DATA && !robots.isFetchAllowed(protocol,port,hostName,url.getPath(),
+ if (robotsUsage >= ROBOTS_DATA && !robots.isFetchAllowed(currentContext,throttleGroupName,
+ protocol,port,hostName,url.getPath(),
userAgent,from,
- minimumMillisecondsPerBytePerServer,
- maxOpenConnectionsPerServer,
- minimumMillisecondsPerFetchPerServer,
proxyHost, proxyPort, proxyAuthDomain, proxyAuthUsername, proxyAuthPassword,
activities, connectionLimit))
{
@@ -955,10 +959,9 @@ public class RSSConnector extends org.ap
{
// Now, use the fetcher, and get the file.
- IThrottledConnection connection = fetcher.createConnection(hostName,
- minimumMillisecondsPerBytePerServer,
- maxOpenConnectionsPerServer,
- minimumMillisecondsPerFetchPerServer,
+ IThrottledConnection connection = fetcher.createConnection(currentContext,
+ throttleGroupName,
+ hostName,
connectionLimit,
feedTimeout,
proxyHost,
@@ -5404,7 +5407,7 @@ public class RSSConnector extends org.ap
{
synchronized (fetcherMap)
{
- ThrottledFetcher tf = (ThrottledFetcher)fetcherMap.get(throttleGroupName);
+ ThrottledFetcher tf = fetcherMap.get(throttleGroupName);
if (tf == null)
{
tf = new ThrottledFetcher();
@@ -5497,6 +5500,47 @@ public class RSSConnector extends org.ap
// Protected classes
+ /** The throttle specification class. Each server name is a different bin in this model.
+ */
+ protected static class ThrottleSpec implements IThrottleSpec
+ {
+ protected final int maxOpenConnectionsPerServer;
+ protected final long minimumMillisecondsPerFetchPerServer;
+ protected final double minimumMillisecondsPerBytePerServer;
+
+ public ThrottleSpec(int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer,
+ double minimumMillisecondsPerBytePerServer)
+ {
+ this.maxOpenConnectionsPerServer = maxOpenConnectionsPerServer;
+ this.minimumMillisecondsPerFetchPerServer = minimumMillisecondsPerFetchPerServer;
+ this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
+ }
+
+ /** Given a bin name, find the max open connections to use for that bin.
+ *@return Integer.MAX_VALUE if no limit found.
+ */
+ public int getMaxOpenConnections(String binName)
+ {
+ return maxOpenConnectionsPerServer;
+ }
+
+ /** Look up minimum milliseconds per byte for a bin.
+ *@return 0.0 if no limit found.
+ */
+ public double getMinimumMillisecondsPerByte(String binName)
+ {
+ return minimumMillisecondsPerBytePerServer;
+ }
+
+ /** Look up minimum milliseconds for a fetch for a bin.
+ *@return 0 if no limit found.
+ */
+ public long getMinimumMillisecondsPerFetch(String binName)
+ {
+ return minimumMillisecondsPerFetchPerServer;
+ }
+ }
+
/** Name/value class */
protected static class NameValue
{
Modified: manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java (original)
+++ manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/Robots.java Tue Dec 17 21:12:42 2013
@@ -109,10 +109,9 @@ public class Robots
*@param pathString is the path (non-query) part of the URL
*@return true if fetch is allowed, false otherwise.
*/
- public boolean isFetchAllowed(String protocol, int port, String hostName, String pathString,
+ public boolean isFetchAllowed(IThreadContext threadContext, String throttleGroupName,
+ String protocol, int port, String hostName, String pathString,
String userAgent, String from,
- double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
- long minimumMillisecondsPerFetchPerServer,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
IVersionActivity activities, int connectionLimit)
throws ManifoldCFException, ServiceInterruption
@@ -134,9 +133,9 @@ public class Robots
}
}
- return host.isFetchAllowed(System.currentTimeMillis(),pathString,
+ return host.isFetchAllowed(threadContext,throttleGroupName,
+ System.currentTimeMillis(),pathString,
userAgent,from,
- minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,
proxyHost, proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword,activities,connectionLimit);
}
@@ -257,10 +256,9 @@ public class Robots
*@param pathString is the path string to check.
*@return true if crawling is allowed, false otherwise.
*/
- public boolean isFetchAllowed(long currentTime, String pathString,
+ public boolean isFetchAllowed(IThreadContext threadContext, String throttleGroupName,
+ long currentTime, String pathString,
String userAgent, String from,
- double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
- long minimumMillisecondsPerFetchPerServer,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
IVersionActivity activities, int connectionLimit)
throws ServiceInterruption, ManifoldCFException
@@ -323,9 +321,7 @@ public class Robots
if (readingRobots)
// This doesn't need to be synchronized because readingRobots blocks all other threads from getting at this object
- makeValid(currentTime,userAgent,from,
- minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,
- minimumMillisecondsPerFetchPerServer,
+ makeValid(threadContext,throttleGroupName,currentTime,userAgent,from,
proxyHost, proxyPort, proxyAuthDomain, proxyAuthUsername, proxyAuthPassword,
hostName, activities, connectionLimit);
@@ -435,9 +431,8 @@ public class Robots
/** Initialize the record. This method reads the robots file on the specified protocol/host/port,
* and parses it according to the rules.
*/
- protected void makeValid(long currentTime, String userAgent, String from,
- double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
- long minimumMillisecondsPerFetchPerServer,
+ protected void makeValid(IThreadContext threadContext, String throttleGroupName,
+ long currentTime, String userAgent, String from,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword,
String hostName, IVersionActivity activities, int connectionLimit)
throws ServiceInterruption, ManifoldCFException
@@ -445,8 +440,8 @@ public class Robots
invalidTime = currentTime + 24L * 60L * 60L * 1000L;
// Do the fetch
- IThrottledConnection connection = fetcher.createConnection(hostName,minimumMillisecondsPerBytePerServer,
- maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS,
+ IThrottledConnection connection = fetcher.createConnection(threadContext,throttleGroupName,
+ hostName,connectionLimit,ROBOT_TIMEOUT_MILLISECONDS,
proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
try
{
Modified: manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java (original)
+++ manifoldcf/trunk/connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ThrottledFetcher.java Tue Dec 17 21:12:42 2013
@@ -88,9 +88,9 @@ public class ThrottledFetcher
/** This is the lock object for that global handle counter */
protected static Integer globalHandleCounterLock = new Integer(0);
- /** This hash maps the server string (without port) to a server object, where
+ /** This hash maps the server string (without port) to a pool throttling object, where
* we can track the statistics and make sure we throttle appropriately */
- protected Map serverMap = new HashMap();
+ protected final Map<String,IConnectionThrottler> serverMap = new HashMap<String,IConnectionThrottler>();
/** Reference count for how many connections to this pool there are */
protected int refCount = 0;
@@ -151,35 +151,25 @@ public class ThrottledFetcher
/** Establish a connection to a specified URL.
* @param serverName is the FQDN of the server, e.g. foo.metacarta.com
- * @param minimumMillisecondsPerBytePerServer is the average number of milliseconds to wait
- * between bytes, on
- * average, over all streams reading from this server. That means that the
- * stream will block on fetch until the number of bytes being fetched, done
- * in the average time interval required for that fetch, would not exceed
- * the desired bandwidth.
- * @param minimumMillisecondsPerFetchPerServer is the number of milliseconds
- * between fetches, as a minimum, on a per-server basis. Set
- * to zero for no limit.
- * @param maxOpenConnectionsPerServer is the maximum number of open connections to allow for a single server.
- * If more than this number of connections would need to be open, then this connection request will block
- * until this number will no longer be exceeded.
* @param connectionLimit is the maximum desired outstanding connections at any one time.
* @param connectionTimeoutMilliseconds is the number of milliseconds to wait for the connection before timing out.
*/
- public synchronized IThrottledConnection createConnection(String serverName, double minimumMillisecondsPerBytePerServer,
- int maxOpenConnectionsPerServer, long minimumMillisecondsPerFetchPerServer, int connectionLimit, int connectionTimeoutMilliseconds,
+ public synchronized IThrottledConnection createConnection(IThreadContext threadContext, String throttleGroupName,
+ String serverName, int connectionLimit, int connectionTimeoutMilliseconds,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
throws ManifoldCFException, ServiceInterruption
{
- Server server;
- server = (Server)serverMap.get(serverName);
+ IConnectionThrottler server;
+ server = serverMap.get(serverName);
if (server == null)
{
- server = new Server(serverName);
+ // Create a connection throttler for this server
+ IThrottleGroups tg = ThrottleGroupsFactory.make(threadContext);
+ server = tg.obtainConnectionThrottler(RSSConnector.rssThrottleGroupType, throttleGroupName, new String[]{serverName});
serverMap.put(serverName,server);
}
- return new ThrottledConnection(server,minimumMillisecondsPerBytePerServer,maxOpenConnectionsPerServer,minimumMillisecondsPerFetchPerServer,
+ return new ThrottledConnection(serverName, server,
connectionTimeoutMilliseconds,connectionLimit,
proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
}
@@ -206,14 +196,8 @@ public class ThrottledFetcher
refCount--;
if (refCount == 0)
{
- // Close all the servers one by one
- Iterator iter = serverMap.keySet().iterator();
- while (iter.hasNext())
- {
- String serverName = (String)iter.next();
- Server server = (Server)serverMap.get(serverName);
- server.discard();
- }
+ // Since we don't have any actual pools here, this can be a no-op for now
+ // MHL
serverMap.clear();
}
}
@@ -222,14 +206,12 @@ public class ThrottledFetcher
*/
protected static class ThrottledConnection implements IThrottledConnection
{
- /** The connection bandwidth we want */
- protected final double minimumMillisecondsPerBytePerServer;
- /** The maximum open connections per server */
- protected final int maxOpenConnectionsPerServer;
- /** The minimum time between fetches */
- protected final long minimumMillisecondsPerFetchPerServer;
- /** The server object we use to track connections and fetches. */
- protected final Server server;
+ /** The server fqdn */
+ protected final String serverName;
+ /** The throttling object we use to track connections */
+ protected final IConnectionThrottler connectionThrottler;
+ /** The throttling object we use to track fetches */
+ protected final IFetchThrottler fetchThrottler;
/** Connection timeout in milliseconds */
protected final int connectionTimeoutMilliseconds;
/** The client connection manager */
@@ -259,15 +241,14 @@ public class ThrottledFetcher
/** Constructor.
*/
- public ThrottledConnection(Server server, double minimumMillisecondsPerBytePerServer, int maxOpenConnectionsPerServer,
- long minimumMillisecondsPerFetchPerServer, int connectionTimeoutMilliseconds, int connectionLimit,
+ public ThrottledConnection(String serverName,
+ IConnectionThrottler connectionThrottler,
+ int connectionTimeoutMilliseconds, int connectionLimit,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
throws ManifoldCFException
{
- this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
- this.maxOpenConnectionsPerServer = maxOpenConnectionsPerServer;
- this.minimumMillisecondsPerFetchPerServer = minimumMillisecondsPerFetchPerServer;
- this.server = server;
+ this.serverName = serverName;
+ this.connectionThrottler = connectionThrottler;
this.connectionTimeoutMilliseconds = connectionTimeoutMilliseconds;
// Create the https scheme for this connection
@@ -330,7 +311,17 @@ public class ThrottledFetcher
httpClient = localHttpClient;
registerGlobalHandle(connectionLimit);
- server.registerConnection(maxOpenConnectionsPerServer);
+ try
+ {
+ int result = connectionThrottler.waitConnectionAvailable();
+ if (result != IConnectionThrottler.CONNECTION_FROM_CREATION)
+ throw new IllegalStateException("Got back unexpected value from waitForAConnection() of "+result);
+ fetchThrottler = connectionThrottler.getNewConnectionFetchThrottler();
+ }
+ catch (InterruptedException e)
+ {
+ throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
+ }
}
/** Begin the fetch process.
@@ -344,7 +335,8 @@ public class ThrottledFetcher
fetchCounter = 0L;
try
{
- server.beginFetch(minimumMillisecondsPerFetchPerServer);
+ if (fetchThrottler.obtainFetchDocumentPermission() == false)
+ throw new IllegalStateException("obtainFetchDocumentPermission() had unexpected return value");
}
catch (InterruptedException e)
{
@@ -386,7 +378,7 @@ public class ThrottledFetcher
{
StringBuilder sb = new StringBuilder(protocol);
- sb.append("://").append(server.getServerName());
+ sb.append("://").append(serverName);
if (port != -1)
sb.append(":").append(Integer.toString(port));
sb.append(urlPath);
@@ -407,8 +399,8 @@ public class ThrottledFetcher
if (lastModified != null)
executeMethod.setHeader(new BasicHeader("Last-Modified",lastModified));
// Create the execution thread.
- methodThread = new ExecuteMethodThread(this, server,
- minimumMillisecondsPerBytePerServer, httpClient, executeMethod);
+ methodThread = new ExecuteMethodThread(this, fetchThrottler,
+ httpClient, executeMethod);
// Start the method thread, which will start the transaction
try
{
@@ -702,7 +694,6 @@ public class ThrottledFetcher
if (methodThread != null && threadStarted)
methodThread.abort();
long endTime = System.currentTimeMillis();
- server.endFetch();
activities.recordActivity(new Long(startFetchTime),RSSConnector.ACTIVITY_FETCH,
new Long(fetchCounter),myUrl,Integer.toString(statusCode),(throwable==null)?null:throwable.getMessage(),null);
@@ -749,7 +740,7 @@ public class ThrottledFetcher
{
// Clean up the connection pool. This should do the necessary bookkeeping to release the one connection that's sitting there.
connectionManager.shutdown();
- server.releaseConnection();
+ connectionThrottler.noteConnectionDestroyed();
releaseGlobalHandle();
}
@@ -760,23 +751,20 @@ public class ThrottledFetcher
*/
protected static class ThrottledInputstream extends InputStream
{
- /** Stream throttling parameters */
- protected double minimumMillisecondsPerBytePerServer;
- /** The throttled connection we belong to */
- protected ThrottledConnection throttledConnection;
- /** The server object we use to track throttling */
- protected Server server;
+ /** Throttled connection */
+ protected final ThrottledConnection throttledConnection;
+ /** Stream throttler */
+ protected final IStreamThrottler streamThrottler;
/** The stream we are wrapping. */
- protected InputStream inputStream;
+ protected final InputStream inputStream;
/** Constructor.
*/
- public ThrottledInputstream(ThrottledConnection connection, Server server, InputStream is, double minimumMillisecondsPerBytePerServer)
+ public ThrottledInputstream(ThrottledConnection throttledConnection, IStreamThrottler streamThrottler, InputStream is)
{
- this.throttledConnection = connection;
- this.server = server;
+ this.throttledConnection = throttledConnection;
+ this.streamThrottler = streamThrottler;
this.inputStream = is;
- this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
}
/** Read a byte.
@@ -839,7 +827,8 @@ public class ThrottledFetcher
{
try
{
- server.beginRead(len,minimumMillisecondsPerBytePerServer);
+ if (streamThrottler.obtainReadPermission(len) == false)
+ throw new IllegalStateException("Throttler shut down while still active");
int amt = 0;
try
{
@@ -849,10 +838,10 @@ public class ThrottledFetcher
finally
{
if (amt == -1)
- server.endRead(len,0);
+ streamThrottler.releaseReadPermission(len,0);
else
{
- server.endRead(len,amt);
+ streamThrottler.releaseReadPermission(len,amt);
throttledConnection.logFetchCount(amt);
}
}
@@ -909,310 +898,16 @@ public class ThrottledFetcher
public void close()
throws IOException
{
- inputStream.close();
- }
-
- }
-
- /** This class represents the throttling stuff kept around for a single server.
- *
- * In order to calculate
- * the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
- * For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
- * window length is too long.
- *
- * One solution to this problem would be to keep a list of the individual fetches as records. Then, we could
- * "expire" a fetch by discarding the old record. However, this is quite memory consumptive for all but the
- * smallest intervals.
- *
- * Another, better, solution is to hook into the start and end of individual fetches. These will, presumably, occur
- * at the fastest possible rate without long pauses spent doing something else. The only complication is that
- * fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
- * For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
- * than keep records around. The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
- * acceptable.
- *
- * For the "maximum open connections" limit, the best thing would be to establish a separate MultiThreadedConnectionPool
- * for each Server. Then, the limit would be automatic.
- *
- * Some notes on the algorithms used to limit server bandwidth impact
- * ==================================================================
- *
- * In a single connection case, the algorithm we'd want to use works like this. On the first chunk of a series,
- * the total length of time and the number of bytes are recorded. Then, prior to each subsequent chunk, a calculation
- * is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
- * access as a way of estimating how long it will take to fetch those next n bytes.
- *
- * For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
- * and harder still to "hit the target", because simultaneous fetches will intrude. The strategy is therefore:
- *
- * 1) The first chunk of any series should proceed without interference from other connections to the same server.
- * The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
- *
- * 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection". That is, if other
- * connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
- * took. Thus, by generating end-time estimates based on this number, we are actually being conservative and
- * using less server bandwidth.
- *
- * 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
- * new chunks from other connections can start.
- *
- */
- protected class Server
- {
- /** The fqdn of the server */
- protected final String serverName;
- /** This is the time of the next allowed fetch (in ms since epoch) */
- protected long nextFetchTime = 0L;
-
- // Bandwidth throttling variables
- /** Reference count for bandwidth variables */
- protected volatile int refCount = 0;
- /** The inverse rate estimate of the first fetch, in ms/byte */
- protected double rateEstimate = 0.0;
- /** Flag indicating whether a rate estimate is needed */
- protected volatile boolean estimateValid = false;
- /** Flag indicating whether rate estimation is in progress yet */
- protected volatile boolean estimateInProgress = false;
- /** The start time of this series */
- protected long seriesStartTime = -1L;
- /** Total actual bytes read in this series; this includes fetches in progress */
- protected long totalBytesRead = -1L;
-
- /** Outstanding connection counter */
- protected volatile int outstandingConnections = 0;
-
- /** Constructor */
- public Server(String serverName)
- {
- this.serverName = serverName;
- }
-
- /** Get the fqdn of the server */
- public String getServerName()
- {
- return serverName;
- }
-
- /** Register an outstanding connection (and wait until it can be obtained before proceeding) */
- public synchronized void registerConnection(int maxOutstandingConnections)
- throws ManifoldCFException
- {
- try
- {
- while (outstandingConnections >= maxOutstandingConnections)
- {
- wait();
- }
- outstandingConnections++;
- }
- catch (InterruptedException e)
- {
- throw new ManifoldCFException("Interrupted: "+e.getMessage(),e,ManifoldCFException.INTERRUPTED);
- }
- }
-
- /** Release an outstanding connection back into the pool */
- public synchronized void releaseConnection()
- {
- outstandingConnections--;
- notifyAll();
- }
-
- /** Note the start of a fetch operation. Call this method just before the actual stream access begins.
- * May wait until schedule allows.
- */
- public void beginFetch(long minimumMillisecondsPerFetchPerServer)
- throws InterruptedException
- {
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Note begin fetch for '"+serverName+"'");
- // First, do any waiting, and reschedule as needed
- long waitAmount = 0L;
- long currentTime = System.currentTimeMillis();
-
- // System.out.println("Begin fetch for server "+this.toString()+" with minimum milliseconds per fetch of "+new Long(minimumMillisecondsPerFetchPerServer).toString()+
- // " Current time: "+new Long(currentTime).toString()+ " Next fetch time: "+new Long(nextFetchTime).toString());
-
- synchronized (this)
- {
- if (currentTime < nextFetchTime)
- {
- waitAmount = nextFetchTime-currentTime;
- nextFetchTime = nextFetchTime + minimumMillisecondsPerFetchPerServer;
- }
- else
- nextFetchTime = currentTime + minimumMillisecondsPerFetchPerServer;
- }
- if (waitAmount > 0L)
- {
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("RSS: Performing a fetch wait for server '"+serverName+"' for "+
- new Long(waitAmount).toString()+" ms.");
- ManifoldCF.sleep(waitAmount);
- }
-
- // System.out.println("For server "+this.toString()+", at "+new Long(System.currentTimeMillis()).toString()+", the next fetch time is now "+new Long(nextFetchTime).toString());
-
- synchronized (this)
- {
- if (refCount == 0)
- {
- // Now, reset bandwidth throttling counters
- estimateValid = false;
- rateEstimate = 0.0;
- totalBytesRead = 0L;
- estimateInProgress = false;
- seriesStartTime = -1L;
- }
- refCount++;
- }
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Begin fetch noted for '"+serverName+"'");
-
- }
-
- /** Note the end of a fetch operation. Call this method just after the fetch completes.
- */
- public void endFetch()
- {
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Note end fetch for '"+serverName+"'");
-
- synchronized (this)
- {
- refCount--;
- }
-
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: End fetch noted for '"+serverName+"'");
-
- }
-
- /** Note the start of an individual byte read of a specified size. Call this method just before the
- * read request takes place. Performs the necessary delay prior to reading specified number of bytes from the server.
- */
- public void beginRead(int byteCount, double minimumMillisecondsPerBytePerServer)
- throws InterruptedException
- {
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Note begin read for '"+serverName+"'");
-
- long currentTime = System.currentTimeMillis();
-
- synchronized (this)
- {
- while (estimateInProgress)
- wait();
- if (estimateValid == false)
- {
- seriesStartTime = currentTime;
- estimateInProgress = true;
- // Add these bytes to the estimated total
- totalBytesRead += (long)byteCount;
- // Exit early; this thread isn't going to do any waiting
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Read begin noted; gathering stats for '"+serverName+"'");
-
- return;
- }
- }
-
- // It is possible for the following code to get interrupted. If that happens,
- // we have to unstick the threads that are waiting on the estimate!
- boolean finished = false;
try
{
- long waitTime = 0L;
- synchronized (this)
- {
- // Add these bytes to the estimated total
- totalBytesRead += (long)byteCount;
-
- // Estimate the time this read will take, and wait accordingly
- long estimatedTime = (long)(rateEstimate * (double)byteCount);
-
- // Figure out how long the total byte count should take, to meet the constraint
- long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
-
- // The wait time is the different between our desired end time, minus the estimated time to read the data, and the
- // current time. But it can't be negative.
- waitTime = (desiredEndTime - estimatedTime) - currentTime;
- }
-
- if (waitTime > 0L)
- {
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("RSS: Performing a read wait on server '"+serverName+"' of "+
- new Long(waitTime).toString()+" ms.");
- ManifoldCF.sleep(waitTime);
- }
-
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Begin read noted for '"+serverName+"'");
- finished = true;
+ inputStream.close();
}
finally
{
- if (!finished)
- {
- abortRead();
- }
- }
- }
-
- /** Abort a read in progress.
- */
- public void abortRead()
- {
- synchronized (this)
- {
- if (estimateInProgress)
- {
- estimateInProgress = false;
- notifyAll();
- }
+ streamThrottler.closeStream();
}
}
- /** Note the end of an individual read from the server. Call this just after an individual read completes.
- * Pass the actual number of bytes read to the method.
- */
- public void endRead(int originalCount, int actualCount)
- {
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: Note end read for '"+serverName+"'");
-
- long currentTime = System.currentTimeMillis();
-
- synchronized (this)
- {
- totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
- if (estimateInProgress)
- {
- if (actualCount == 0)
- // Didn't actually get any bytes, so use 0.0
- rateEstimate = 0.0;
- else
- rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
- estimateValid = true;
- estimateInProgress = false;
- notifyAll();
- }
- }
-
- //if (Logging.connectors.isTraceEnabled())
- // Logging.connectors.trace("RSS: End read noted for '"+serverName+"'");
-
- }
-
- /** Discard this server.
- */
- public void discard()
- {
- // Nothing needed anymore
- }
-
}
/** This thread does the actual socket communication with the server.
@@ -1235,10 +930,8 @@ public class ThrottledFetcher
{
/** The connection */
protected final ThrottledConnection theConnection;
- /** The connection bandwidth we want */
- protected final double minimumMillisecondsPerBytePerServer;
- /** The server object we use to track connections and fetches. */
- protected final Server server;
+ /** The fetch throttler */
+ protected final IFetchThrottler fetchThrottler;
/** Client and method, all preconfigured */
protected final HttpClient httpClient;
protected final HttpRequestBase executeMethod;
@@ -1256,15 +949,13 @@ public class ThrottledFetcher
protected Throwable generalException = null;
- public ExecuteMethodThread(ThrottledConnection theConnection, Server server,
- double minimumMillisecondsPerBytePerServer,
+ public ExecuteMethodThread(ThrottledConnection theConnection, IFetchThrottler fetchThrottler,
HttpClient httpClient, HttpRequestBase executeMethod)
{
super();
setDaemon(true);
this.theConnection = theConnection;
- this.server = server;
- this.minimumMillisecondsPerBytePerServer = minimumMillisecondsPerBytePerServer;
+ this.fetchThrottler = fetchThrottler;
this.httpClient = httpClient;
this.executeMethod = executeMethod;
}
@@ -1316,7 +1007,7 @@ public class ThrottledFetcher
bodyStream = response.getEntity().getContent();
if (bodyStream != null)
{
- bodyStream = new ThrottledInputstream(theConnection,server,bodyStream,minimumMillisecondsPerBytePerServer);
+ bodyStream = new ThrottledInputstream(theConnection,fetchThrottler.createFetchStream(),bodyStream);
threadStream = new XThreadInputStream(bodyStream);
}
streamCreated = true;
Modified: manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java (original)
+++ manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/IThrottledConnection.java Tue Dec 17 21:12:42 2013
@@ -39,6 +39,12 @@ public interface IThrottledConnection
public static final int FETCH_INTERRUPTED = -104;
public static final int FETCH_UNKNOWN_ERROR = -999;
+ /** Check whether the connection has expired.
+ *@param currentTime is the current time to use to judge if a connection has expired.
+ *@return true if the connection has expired, and should be closed.
+ */
+ public boolean hasExpired(long currentTime);
+
/** Begin the fetch process.
* @param fetchType is a short descriptive string describing the kind of fetch being requested. This
* is used solely for logging purposes.
@@ -113,8 +119,13 @@ public interface IThrottledConnection
public void doneFetch(IVersionActivity activities)
throws ManifoldCFException;
- /** Close the connection. Call this to end this server connection.
+ /** Close the connection. Call this to return the connection to
+ * its pool.
*/
- public void close()
- throws ManifoldCFException;
+ public void close();
+
+ /** Destroy the connection. Call this to close the connection.
+ */
+ public void destroy();
+
}
Modified: manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java (original)
+++ manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottleDescription.java Tue Dec 17 21:12:42 2013
@@ -33,13 +33,13 @@ import java.util.regex.*;
* any given bin value as much as possible. For that reason I've organized this structure
* accordingly.
*/
-public class ThrottleDescription
+public class ThrottleDescription implements IThrottleSpec
{
public static final String _rcsid = "@(#)$Id: ThrottleDescription.java 988245 2010-08-23 18:39:35Z kwright $";
/** This is the hash that contains everything. It's keyed by the regexp string itself.
* Values are ThrottleItem's. */
- protected HashMap patternHash = new HashMap();
+ protected Map<String,ThrottleItem> patternHash = new HashMap<String,ThrottleItem>();
/** Constructor. Build the description from the ConfigParams. */
public ThrottleDescription(ConfigParams configData)
@@ -146,17 +146,15 @@ public class ThrottleDescription
}
/** Given a bin name, find the max open connections to use for that bin.
- *@return -1 if no limit found.
+ *@return Integer.MAX_VALUE if no limit found.
*/
+ @Override
public int getMaxOpenConnections(String binName)
{
// Go through the regexps and match; for each match, find the maximum possible.
int maxCount = -1;
- Iterator iter = patternHash.keySet().iterator();
- while (iter.hasNext())
+ for (ThrottleItem ti : patternHash.values())
{
- String binDescription = (String)iter.next();
- ThrottleItem ti = (ThrottleItem)patternHash.get(binDescription);
Integer limit = ti.getMaxOpenConnections();
if (limit != null)
{
@@ -169,22 +167,24 @@ public class ThrottleDescription
}
}
}
+ if (maxCount == -1)
+ maxCount = Integer.MAX_VALUE;
+ else if (maxCount == 0)
+ maxCount = 1;
return maxCount;
}
/** Look up minimum milliseconds per byte for a bin.
*@return 0.0 if no limit found.
*/
+ @Override
public double getMinimumMillisecondsPerByte(String binName)
{
// Go through the regexps and match; for each match, find the maximum possible.
double minMilliseconds = 0.0;
boolean seenSomething = false;
- Iterator iter = patternHash.keySet().iterator();
- while (iter.hasNext())
+ for (ThrottleItem ti : patternHash.values())
{
- String binDescription = (String)iter.next();
- ThrottleItem ti = (ThrottleItem)patternHash.get(binDescription);
Double limit = ti.getMinimumMillisecondsPerByte();
if (limit != null)
{
@@ -206,16 +206,14 @@ public class ThrottleDescription
/** Look up minimum milliseconds for a fetch for a bin.
*@return 0 if no limit found.
*/
+ @Override
public long getMinimumMillisecondsPerFetch(String binName)
{
// Go through the regexps and match; for each match, find the maximum possible.
long minMilliseconds = 0L;
boolean seenSomething = false;
- Iterator iter = patternHash.keySet().iterator();
- while (iter.hasNext())
+ for (ThrottleItem ti : patternHash.values())
{
- String binDescription = (String)iter.next();
- ThrottleItem ti = (ThrottleItem)patternHash.get(binDescription);
Long limit = ti.getMinimumMillisecondsPerFetch();
if (limit != null)
{
@@ -239,7 +237,7 @@ public class ThrottleDescription
protected static class ThrottleItem
{
/** The bin-matching pattern. */
- protected Pattern pattern;
+ protected final Pattern pattern;
/** The minimum milliseconds between bytes, or null if no limit. */
protected Double minimumMillisecondsPerByte = null;
/** The minimum milliseconds per fetch, or null if no limit */