You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/17 22:12:43 UTC
svn commit: r1551707 [2/2] - in /manifoldcf/trunk: ./ connectors/rss/
connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/
connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler...
Modified: manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java (original)
+++ manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java Tue Dec 17 21:12:42 2013
@@ -100,6 +100,12 @@ public class ThrottledFetcher
{
public static final String _rcsid = "@(#)$Id: ThrottledFetcher.java 989847 2010-08-26 17:52:30Z kwright $";
+ /** Web throttle group type */
+ protected static final String webThrottleGroupType = "_WEB_";
+
+ /** Idle timeout */
+ protected static final long idleTimeout = 300000L;
+
/** This flag determines whether we record everything to the disk, as a means of doing a web snapshot */
protected static final boolean recordEverything = false;
@@ -109,18 +115,13 @@ public class ThrottledFetcher
protected static final long TIME_6HRS = 6L * 60L * 60000L;
protected static final long TIME_1DAY = 24L * 60L * 60000L;
+ /** The read chunk length */
+ protected static final int READ_CHUNK_LENGTH = 4096;
- // The following static bin pools correspond to global resources that will be managed via ILockManager.
+ /** Connection pools.
+ /* This is a static hash of the connection pools in existence. Each connection pool represents a set of identical connections. */
+ protected final static Map<ConnectionPoolKey,ConnectionPool> connectionPools = new HashMap<ConnectionPoolKey,ConnectionPool>();
- /** This is the static pool of ConnectionBin's, keyed by bin name. */
- protected static Map<String,ConnectionBin> connectionBins = new HashMap<String,ConnectionBin>();
- /** This is the static pool of ThrottleBin's, keyed by bin name. */
- protected static Map<String,ThrottleBin> throttleBins = new HashMap<String,ThrottleBin>();
-
- /** This global lock protects the "distributed pool" resource, and insures that a connection
- * can get pulled out of all the right pools and wind up in only the hands of one thread. */
- protected static Integer poolLock = new Integer(0);
-
/** Current host name */
private static String currentHost = null;
static
@@ -138,17 +139,13 @@ public class ThrottledFetcher
}
}
- /** The read chunk length */
- protected static final int READ_CHUNK_LENGTH = 4096;
-
- /** Constructor.
+ /** Constructor. Private since we never instantiate.
*/
- public ThrottledFetcher()
+ private ThrottledFetcher()
{
}
-
/** Obtain a connection to specified protocol, server, and port. We use the protocol because the
* setup for some protocols is extensive (e.g. https) and hopefully would not need to be repeated if
* we distinguish connections based on that.
@@ -164,15 +161,22 @@ public class ThrottledFetcher
*@param connectionLimit isthe maximum number of connections permitted.
*@return an IThrottledConnection object that can be used to fetch from the port.
*/
- public static IThrottledConnection getConnection(String protocol, String server, int port,
+ public static IThrottledConnection getConnection(IThreadContext threadContext, String throttleGroupName,
+ String protocol, String server, int port,
PageCredentials authentication,
IKeystoreManager trustStore,
- ThrottleDescription throttleDescription, String[] binNames,
+ IThrottleSpec throttleDescription, String[] binNames,
int connectionLimit,
String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
throws ManifoldCFException
{
- // Create the https scheme for this connection
+ // Get a throttle groups handle
+ IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+
+ // Create the appropruate throttle group, or update the throttle description for an existing one
+ throttleGroups.createOrUpdateThrottleGroup(webThrottleGroupType,throttleGroupName,throttleDescription);
+
+ // Create the https scheme and trust store string for this connection
javax.net.ssl.SSLSocketFactory baseFactory;
String trustStoreString;
if (trustStore != null)
@@ -186,781 +190,68 @@ public class ThrottledFetcher
trustStoreString = null;
}
-
- ConnectionBin[] bins = new ConnectionBin[binNames.length];
-
- // Now, start looking for a connection
- int i = 0;
- while (i < binNames.length)
+ // Construct a connection pool key
+ ConnectionPoolKey poolKey = new ConnectionPoolKey(protocol,server,port,authentication,
+ trustStoreString,proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+
+ ConnectionPool p;
+ synchronized (connectionPools)
{
- String binName = binNames[i];
-
- // Find or create the bin object
- ConnectionBin cb;
- synchronized (connectionBins)
+ p = connectionPools.get(poolKey);
+ if (p == null)
{
- cb = connectionBins.get(binName);
- if (cb == null)
- {
- cb = new ConnectionBin(binName);
- connectionBins.put(binName,cb);
- }
- //cb.sanityCheck();
+ // Construct a new IConnectionThrottler.
+ IConnectionThrottler connectionThrottler =
+ throttleGroups.obtainConnectionThrottler(webThrottleGroupType,throttleGroupName,binNames);
+ p = new ConnectionPool(connectionThrottler,protocol,server,port,authentication,baseFactory,
+ proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+ connectionPools.put(poolKey,p);
}
- bins[i] = cb;
- i++;
}
-
- ThrottledConnection connectionToReuse;
-
- long startTime = 0L;
- if (Logging.connectors.isDebugEnabled())
+
+ try
{
- startTime = System.currentTimeMillis();
- Logging.connectors.debug("WEB: Waiting to start getting a connection to "+protocol+"://"+server+":"+port);
+ return p.grab();
}
-
- synchronized (poolLock)
+ catch (InterruptedException e)
{
-
- // If the number of outstanding connections is greater than the global limit, close pooled connections until we are under the limit
- long idleTimeout = 64000L;
- while (true)
- {
- int openCount = 0;
-
- // Lock up everything for a moment
- synchronized (connectionBins)
- {
- // Time out connections that have been idle too long. To do this, we need to go through
- // all connection bins and look at the pool
- for (String binName : connectionBins.keySet())
- {
- ConnectionBin cb = connectionBins.get(binName);
- openCount += cb.countConnections();
- }
- }
-
- if (openCount < connectionLimit)
- break;
-
- if (idleTimeout == 0L)
- {
- // Can't actually conclude anything here unfortunately
-
- // Logging.connectors.warn("Web: Exceeding connection limit! Open count = "+Integer.toString(openCount)+"; limit = "+Integer.toString(connectionLimit));
- break;
- }
- idleTimeout = idleTimeout/4L;
-
- // Lock up everything for a moment, since otherwise we could delete something people
- // expect to stick around.
- synchronized (connectionBins)
- {
- // Time out connections that have been idle too long. To do this, we need to go through
- // all connection bins and look at the pool
- for (String binName : connectionBins.keySet())
- {
- ConnectionBin cb = connectionBins.get(binName);
- cb.flushIdleConnections(idleTimeout);
- }
- }
- }
-
- try
- {
- // Retry until we get the connection.
- while (true)
- {
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("WEB: Attempting to get connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
- i = 0;
-
- connectionToReuse = null;
-
- try
- {
-
- // Now, start looking for a connection
- while (i < binNames.length)
- {
- String binName = binNames[i];
- ConnectionBin cb = bins[i];
-
- // Figure out the connection limit for this bin, based on the throttle description
- int maxConnections = throttleDescription.getMaxOpenConnections(binName);
-
- // If no restriction, use a very large value.
- if (maxConnections == -1)
- maxConnections = Integer.MAX_VALUE;
- else if (maxConnections == 0)
- maxConnections = 1;
-
- // Now, do what we need to do to reserve our connection for this bin.
- // If we can't reserve it now, we plan on undoing everything we did, so
- // whatever we do must be reversible. Furthermore, nothing we call here
- // should actually wait(); that will occur if we can't get what we need out
- // here at this level.
-
- if (connectionToReuse != null)
- {
- // We have a reuse candidate already, so just make sure each remaining bin is within
- // its limits.
- cb.insureWithinLimits(maxConnections,connectionToReuse);
- }
- else
- {
- connectionToReuse = cb.findConnection(maxConnections,bins,protocol,server,port,authentication,trustStoreString,
- proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
- }
-
- // Increment after we successfully handled this bin
- i++;
- }
-
- // That loop completed, meaning that we think we got a connection. Now, go through all the bins and make sure there's enough time since the last
- // fetch. If not, we have to clean everything up and try again.
- long currentTime = System.currentTimeMillis();
-
- // Global lock needed to insure that fetch time is updated across all bins simultaneously
- synchronized (connectionBins)
- {
- i = 0;
- while (i < binNames.length)
- {
- String binName = binNames[i];
- ConnectionBin cb = bins[i];
- //cb.sanityCheck();
- // Get the minimum time between fetches for this bin, based on the throttle description
- long minMillisecondsPerFetch = throttleDescription.getMinimumMillisecondsPerFetch(binName);
- if (cb.getLastFetchTime() + minMillisecondsPerFetch > currentTime)
- throw new WaitException(cb.getLastFetchTime() + minMillisecondsPerFetch - currentTime);
- i++;
- }
- i = 0;
- while (i < binNames.length)
- {
- ConnectionBin cb = bins[i++];
- cb.setLastFetchTime(currentTime);
- }
- }
-
- }
- catch (Throwable e)
- {
- // We have to free everything and retry, because otherwise we are subject to deadlock.
- // The only thing we have reserved is the connection, which we must free if there's a
- // problem.
-
- if (connectionToReuse != null)
- {
- // Return this connection to the pool. That is, the pools for all the bins.
- int k = 0;
- while (k < binNames.length)
- {
- String binName = binNames[k++];
- ConnectionBin cb;
- synchronized (connectionBins)
- {
- cb = connectionBins.get(binName);
- if (cb == null)
- {
- cb = new ConnectionBin(binName);
- connectionBins.put(binName,cb);
- }
- }
- //cb.sanityCheck();
- cb.addToPool(connectionToReuse);
- //cb.sanityCheck();
- }
- connectionToReuse = null;
- // We should not need to notify here because nothing has really changed from
- // when the attempt started to get the connection. We just undid what we did.
- }
-
-
- if (e instanceof Error)
- throw (Error)e;
- if (e instanceof ManifoldCFException)
- throw (ManifoldCFException)e;
-
- if (e instanceof WaitException)
- {
- // Wait because we need a certain amount of time after a previous fetch.
- WaitException we = (WaitException)e;
- long waitAmount = we.getWaitAmount();
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("WEB: Waiting "+new Long(waitAmount).toString()+" ms before starting fetch on "+protocol+"://"+server+":"+port);
- // Really don't want to sleep inside the pool lock!
- // The easiest thing to do instead is to use a timed wait. There is no reason why we need
- // to wake before the wait time is exceeded - but it's harmless, and the alternative is to
- // do more reorganization than probably is wise.
- poolLock.wait(waitAmount);
- continue;
- }
-
- if (e instanceof PoolException)
- {
-
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("WEB: Going into wait for connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
- // Now, wait for something external to change. The only thing that can help us is if
- // some other thread frees a connection.
- poolLock.wait();
- // Go back around and try again.
- continue;
- }
-
- throw new ManifoldCFException("Unexpected exception encountered: "+e.getMessage(),e);
- }
-
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("WEB: Successfully got connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
- // If we have a connection located, activate it.
- if (connectionToReuse == null)
- connectionToReuse = new ThrottledConnection(protocol,server,port,authentication,baseFactory,trustStoreString,bins,
- proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
- connectionToReuse.setup(throttleDescription);
- return connectionToReuse;
- }
- }
- catch (InterruptedException e)
- {
- throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
- }
+ throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
}
}
-
/** Flush connections that have timed out from inactivity. */
- public static void flushIdleConnections()
+ public static void flushIdleConnections(IThreadContext threadContext)
throws ManifoldCFException
{
- synchronized (poolLock)
- {
- // Lock up everything for a moment, since otherwise we could delete something people
- // expect to stick around.
- synchronized (connectionBins)
- {
- // Time out connections that have been idle too long. To do this, we need to go through
- // all connection bins and look at the pool
- for (String binName : connectionBins.keySet())
- {
- ConnectionBin cb = connectionBins.get(binName);
- if (cb.flushIdleConnections(60000L))
- {
- // Bin is no longer doing anything; get rid of it.
- // I've determined this is safe - inUseConnections is designed to prevent any active connection from getting
- // whacked.
- // Oops. Hang results again when I enabled this, so out it goes again.
- //connectionBins.remove(binName);
- //binIter = connectionBins.keySet().iterator();
- }
- }
- }
- }
- }
-
- /** Connection pool for a bin.
- * An instance of this class tracks the connections that are pooled and that are in use for a specific bin.
- * NOTE WELL: This resource must be constrained globally, across all JVMs!
- * To do that, we need an ILockManager to handle the global data for each bin.
- */
- protected static class ConnectionBin
- {
- /** This is the bin name which this connection pool belongs to */
- protected String binName;
- /** This is the number of connections in this bin that are signed out and presumably in use */
- protected int inUseConnections = 0;
- /** This is the last time a fetch was done on this bin */
- protected long lastFetchTime = 0L;
- /** This object is what we synchronize on when we are waiting on a connection to free up for this
- * bin. This is a separate object, because we also want to protect the integrity of the
- * ConnectionBin object itself, for which we'll use the ConnectionBin's synchronizer. */
- protected Integer connectionWait = new Integer(0);
- /** This map contains ThrottledConnection objects that are in the pool, and are not in use. */
- protected HashMap freePool = new HashMap();
-
- /** Constructor. */
- public ConnectionBin(String binName)
- {
- this.binName = binName;
- }
-
- /** Get the bin name. */
- public String getBinName()
- {
- return binName;
- }
-
- /** Note the creation of an active connection that belongs to this bin. The slots all must
- * have been reserved prior to the connection being created.
- */
- public synchronized void noteConnectionCreation()
- {
- inUseConnections++;
- }
-
- /** Note the destruction of an active connection that belongs to this bin.
- */
- public synchronized void noteConnectionDestruction()
- {
- inUseConnections--;
- }
-
-
- /** Activate a connection that should be in the pool.
- * Removes the connection from the pool.
- */
- public synchronized void takeFromPool(ThrottledConnection tc)
- {
- // Remove this connection from the pool list
- freePool.remove(tc);
- inUseConnections++;
- }
-
- /** Put a connection into the pool.
- */
- public synchronized void addToPool(ThrottledConnection tc)
- {
- // Add this connection to the pool list
- freePool.put(tc,tc);
- inUseConnections--;
- }
-
- /** Verify that this bin is within limits.
- */
- public synchronized void insureWithinLimits(int maxConnections, ThrottledConnection existingConnection)
- throws PoolException
- {
- //sanityCheck();
-
- // See if the connection is in fact within the pool; if so, we just presume the limits are fine as they are.
- // This is necessary because if the connection that's being checked for is freed, then we wreck the data structures.
- if (existsInPool(existingConnection))
- return;
-
- while (maxConnections > 0 && inUseConnections + freePool.size() > maxConnections)
- {
- //sanityCheck();
-
- // If there are any pool connections, free them one at a time
- ThrottledConnection freeMe = getPoolConnection();
- if (freeMe != null)
- {
- // It's okay to call activate since we guarantee that only one thread is trying to grab
- // a connection at a time.
- freeMe.activate();
- freeMe.destroy();
- continue;
- }
-
- // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
- throw new PoolException("Waiting for a connection");
- }
- }
-
- /** This method is called only when there is no existing connection yet identified that can be used
- * for contacting the server and port specified. This method returns a connection if a matching one can be found;
- * otherwise it returns null.
- * If a matching connection is found, it is activated before it is returned. That removes the connection from all
- * pools in which it lives.
- */
- public synchronized ThrottledConnection findConnection(int maxConnections,
- ConnectionBin[] binNames, String protocol, String server, int port,
- PageCredentials authentication, String trustStoreString,
- String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
- throws PoolException
- {
- //sanityCheck();
-
- // First, wait until there's no excess.
- while (maxConnections > 0 && inUseConnections + freePool.size() > maxConnections)
- {
- //sanityCheck();
- // If there are any pool connections, free them one at a time
- ThrottledConnection freeMe = getPoolConnection();
- if (freeMe != null)
- {
- // It's okay to call activate since we guarantee that only one thread is trying to grab
- // a connection at a time.
- freeMe.activate();
- freeMe.destroy();
- continue;
- }
-
- // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
- throw new PoolException("Waiting for a connection");
-
- }
-
- // Wait until there's a free one
- if (maxConnections > 0 && inUseConnections > maxConnections-1)
- {
- // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
- throw new PoolException("Waiting for a connection");
- }
-
- // A null return means that there is no existing pooled connection that matches, and the caller is free to create a new connection
- ThrottledConnection rval = getPoolConnection();
- if (rval == null)
- return null;
-
- // It's okay to call activate since we guarantee that only one thread is trying to grab
- // a connection at a time.
- rval.activate();
- //sanityCheck();
-
- if (!rval.matches(binNames,protocol,server,port,authentication,trustStoreString,
- proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword))
- {
- // Destroy old connection. That should free up space for a new creation.
- rval.destroy();
- // Return null to indicate that we can create a new connection now
- return null;
- }
-
- // Existing entry matched. Activate and return it.
- return rval;
- }
-
- /** Note a new time for connection fetch for this pool.
- *@param currentTime is the time the fetch was started.
- */
- public synchronized void setLastFetchTime(long currentTime)
- {
- if (currentTime > lastFetchTime)
- lastFetchTime = currentTime;
- }
-
- /** Get the last fetch time.
- *@return the time.
- */
- public synchronized long getLastFetchTime()
- {
- return lastFetchTime;
- }
-
- /** Count connections that are in use.
- *@return connections that are in use.
- */
- public synchronized int countConnections()
- {
- return freePool.size() + inUseConnections;
- }
-
- /** Flush any idle connections.
- *@return true if the connection bin is now, in fact, empty.
- */
- public synchronized boolean flushIdleConnections(long idleTimeout)
- {
- //sanityCheck();
-
- // We have to time out the pool connections. When there are no pool connections
- // left, AND the in-use counts are zero, we can delete the whole thing.
- Iterator iter = freePool.keySet().iterator();
- while (iter.hasNext())
- {
- ThrottledConnection tc = (ThrottledConnection)iter.next();
- if (tc.flushIdleConnections(idleTimeout))
- {
- // Can delete this connection, since it timed out.
- tc.activate();
- tc.destroy();
- iter = freePool.keySet().iterator();
- }
- }
-
- //sanityCheck();
-
- return (freePool.size() == 0 && inUseConnections == 0);
- }
-
- /** Grab a connection from the current pool. This does not remove the connection from the pool;
- * it just sets it up so that later methods can do that.
- */
- protected ThrottledConnection getPoolConnection()
- {
- if (freePool.size() == 0)
- return null;
- Iterator iter = freePool.keySet().iterator();
- ThrottledConnection rval = (ThrottledConnection)iter.next();
- return rval;
- }
-
- /** Check if a connection exists in the pool already.
- */
- protected boolean existsInPool(ThrottledConnection tc)
- {
- return freePool.get(tc) != null;
- }
-
- public synchronized void sanityCheck()
- {
- // Make sure all the connections in the current pool in fact have a reference to this bin.
- Iterator iter = freePool.keySet().iterator();
- while (iter.hasNext())
- {
- ThrottledConnection tc = (ThrottledConnection)iter.next();
- tc.mustHaveReference(this);
- }
- }
-
- }
-
- /** Throttles for a bin.
- * An instance of this class keeps track of the information needed to bandwidth throttle access
- * to a url belonging to a specific bin.
- *
- * In order to calculate
- * the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
- * For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
- * window length is too long.
- *
- * One solution to this problem would be to keep a list of the individual fetches as records. Then, we could
- * "expire" a fetch by discarding the old record. However, this is quite memory consumptive for all but the
- * smallest intervals.
- *
- * Another, better, solution is to hook into the start and end of individual fetches. These will, presumably, occur
- * at the fastest possible rate without long pauses spent doing something else. The only complication is that
- * fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
- * For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
- * than keep records around. The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
- * acceptable.
- *
- * Some notes on the algorithms used to limit server bandwidth impact
- * ==================================================================
- *
- * In a single connection case, the algorithm we'd want to use works like this. On the first chunk of a series,
- * the total length of time and the number of bytes are recorded. Then, prior to each subsequent chunk, a calculation
- * is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
- * access as a way of estimating how long it will take to fetch those next n bytes.
- *
- * For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
- * and harder still to "hit the target", because simultaneous fetches will intrude. The strategy is therefore:
- *
- * 1) The first chunk of any series should proceed without interference from other connections to the same server.
- * The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
- *
- * 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection". That is, if other
- * connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
- * took. Thus, by generating end-time estimates based on this number, we are actually being conservative and
- * using less server bandwidth.
- *
- * 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
- * new chunks from other connections can start.
- *
- * NOTE WELL: This resource must be constrained globally, across all JVMs!
- * To do that, we need an ILockManager to handle the global data for each bin.
- */
- protected static class ThrottleBin
- {
- /** This is the bin name which this throttle belongs to. */
- protected final String binName;
- /** This is the reference count for this bin (which records active references) */
- protected volatile int refCount = 0;
- /** The inverse rate estimate of the first fetch, in ms/byte */
- protected double rateEstimate = 0.0;
- /** Flag indicating whether a rate estimate is needed */
- protected volatile boolean estimateValid = false;
- /** Flag indicating whether rate estimation is in progress yet */
- protected volatile boolean estimateInProgress = false;
- /** The start time of this series */
- protected long seriesStartTime = -1L;
- /** Total actual bytes read in this series; this includes fetches in progress */
- protected long totalBytesRead = -1L;
-
- /** Constructor. */
- public ThrottleBin(String binName)
- {
- this.binName = binName;
- }
-
- /** Get the bin name. */
- public String getBinName()
- {
- return binName;
- }
-
- /** Note the start of a fetch operation for a bin. Call this method just before the actual stream access begins.
- * May wait until schedule allows.
- */
- public void beginFetch()
- throws InterruptedException
+ // Go through outstanding connection pools and clean them up.
+ synchronized (connectionPools)
{
- synchronized (this)
- {
- if (refCount == 0)
- {
- // Now, reset bandwidth throttling counters
- estimateValid = false;
- rateEstimate = 0.0;
- totalBytesRead = 0L;
- estimateInProgress = false;
- seriesStartTime = -1L;
- }
- refCount++;
- }
-
- }
-
- /** Abort the fetch.
- */
- public void abortFetch()
- {
- synchronized (this)
+ for (ConnectionPool pool : connectionPools.values())
{
- refCount--;
+ pool.flushIdleConnections();
}
}
-
- /** Note the start of an individual byte read of a specified size. Call this method just before the
- * read request takes place. Performs the necessary delay prior to reading specified number of bytes from the server.
- */
- public void beginRead(int byteCount, double minimumMillisecondsPerBytePerServer)
- throws InterruptedException
- {
- long currentTime = System.currentTimeMillis();
-
- synchronized (this)
- {
- while (estimateInProgress)
- wait();
- if (estimateValid == false)
- {
- seriesStartTime = currentTime;
- estimateInProgress = true;
- // Add these bytes to the estimated total
- totalBytesRead += (long)byteCount;
- // Exit early; this thread isn't going to do any waiting
- return;
- }
- }
-
- // It is possible for the following code to get interrupted. If that happens,
- // we have to unstick the threads that are waiting on the estimate!
- boolean finished = false;
- try
- {
- long waitTime = 0L;
- synchronized (this)
- {
- // Add these bytes to the estimated total
- totalBytesRead += (long)byteCount;
-
- // Estimate the time this read will take, and wait accordingly
- long estimatedTime = (long)(rateEstimate * (double)byteCount);
-
- // Figure out how long the total byte count should take, to meet the constraint
- long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
-
- // The wait time is the different between our desired end time, minus the estimated time to read the data, and the
- // current time. But it can't be negative.
- waitTime = (desiredEndTime - estimatedTime) - currentTime;
- }
-
- if (waitTime > 0L)
- {
- if (Logging.connectors.isDebugEnabled())
- Logging.connectors.debug("WEB: Performing a read wait on bin '"+binName+"' of "+
- new Long(waitTime).toString()+" ms.");
- ManifoldCF.sleep(waitTime);
- }
- finished = true;
- }
- finally
- {
- if (!finished)
- {
- abortRead();
- }
- }
- }
-
- /** Abort a read in progress.
- */
- public void abortRead()
- {
- synchronized (this)
- {
- if (estimateInProgress)
- {
- estimateInProgress = false;
- notifyAll();
- }
- }
- }
-
- /** Note the end of an individual read from the server. Call this just after an individual read completes.
- * Pass the actual number of bytes read to the method.
- */
- public void endRead(int originalCount, int actualCount)
- {
- long currentTime = System.currentTimeMillis();
-
- synchronized (this)
- {
- totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
- if (estimateInProgress)
- {
- if (actualCount == 0)
- // Didn't actually get any bytes, so use 0.0
- rateEstimate = 0.0;
- else
- rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
- estimateValid = true;
- estimateInProgress = false;
- notifyAll();
- }
- }
- }
-
- /** Note the end of a fetch operation. Call this method just after the fetch completes.
- */
- public boolean endFetch()
- {
- synchronized (this)
- {
- refCount--;
- return (refCount == 0);
- }
-
- }
-
}
/** Throttled connections. Each instance of a connection describes the bins to which it belongs,
* along with the actual open connection itself, and the last time the connection was used. */
protected static class ThrottledConnection implements IThrottledConnection
{
- /** The connection has resolved pointers to the ConnectionBin structures that manage pool
- * maximums. These are ONLY valid when the connection is actually in the pool. */
- protected ConnectionBin[] connectionBinArray;
- /** The connection has resolved pointers to the ThrottleBin structures that help manage
- * bandwidth throttling. */
- protected ThrottleBin[] throttleBinArray;
- /** These are the bandwidth limits, per bin */
- protected double[] minMillisecondsPerByte;
- /** Is the connection considered "active"? */
- protected boolean isActive;
- /** If not active, this is when it went inactive */
- protected long inactiveTime = 0L;
-
+ /** Connection pool */
+ protected final ConnectionPool myPool;
+ /** Fetch throttler */
+ protected final IFetchThrottler fetchThrottler;
/** Protocol */
- protected String protocol;
+ protected final String protocol;
/** Server */
- protected String server;
+ protected final String server;
/** Port */
- protected int port;
+ protected final int port;
/** Authentication */
- protected PageCredentials authentication;
- /** Trust store */
- protected IKeystoreManager trustStore;
- /** Trust store string */
- protected String trustStoreString;
+ protected final PageCredentials authentication;
+
+ /** This is when the connection will expire. Only valid if connection is in the pool. */
+ protected long expireTime = -1L;
/** The http connection manager. The pool is of size 1. */
protected PoolingClientConnectionManager connManager = null;
@@ -975,210 +266,66 @@ public class ThrottledFetcher
/** The status code fetched, if any */
protected int statusCode = FETCH_NOT_TRIED;
/** The kind of fetch we are doing */
- protected String fetchType = null;
- /** The current bytes in the current fetch */
- protected long fetchCounter = 0L;
- /** The start of the current fetch */
- protected long startFetchTime = -1L;
- /** The cookies from the last fetch */
- protected LoginCookies lastFetchCookies = null;
- /** Proxy host */
- protected final String proxyHost;
- /** Proxy port */
- protected final int proxyPort;
- /** Proxy auth domain */
- protected final String proxyAuthDomain;
- /** Proxy auth user name */
- protected final String proxyAuthUsername;
- /** Proxy auth password */
- protected final String proxyAuthPassword;
- /** Https protocol */
- protected final javax.net.ssl.SSLSocketFactory httpsSocketFactory;
-
- /** The thread that is actually doing the work */
- protected ExecuteMethodThread methodThread = null;
- /** Set if thread has been started */
- protected boolean threadStarted = false;
-
-
- /** Constructor. Create a connection with a specific server and port, and
- * register it as active against all bins. */
- public ThrottledConnection(String protocol, String server, int port, PageCredentials authentication,
- javax.net.ssl.SSLSocketFactory httpsSocketFactory, String trustStoreString, ConnectionBin[] connectionBins,
- String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
- {
- this.proxyHost = proxyHost;
- this.proxyPort = proxyPort;
- this.proxyAuthDomain = proxyAuthDomain;
- this.proxyAuthUsername = proxyAuthUsername;
- this.proxyAuthPassword = proxyAuthPassword;
- this.protocol = protocol;
- this.server = server;
- this.port = port;
- this.authentication = authentication;
- this.httpsSocketFactory = httpsSocketFactory;
- this.trustStoreString = trustStoreString;
- this.connectionBinArray = connectionBins;
- this.throttleBinArray = new ThrottleBin[connectionBins.length];
- this.minMillisecondsPerByte = new double[connectionBins.length];
- this.isActive = true;
- int i = 0;
- while (i < connectionBins.length)
- {
- connectionBins[i].noteConnectionCreation();
- // We don't keep throttle bin references around, since these are transient
- throttleBinArray[i] = null;
- minMillisecondsPerByte[i] = 0.0;
- i++;
- }
-
-
- }
-
- public void mustHaveReference(ConnectionBin cb)
- {
- int i = 0;
- while (i < connectionBinArray.length)
- {
- if (cb == connectionBinArray[i])
- return;
- i++;
- }
- String msg = "Connection bin "+cb.toString()+" owns connection "+this.toString()+" for "+protocol+server+":"+port+
- " but there is no back reference!";
- Logging.connectors.error(msg);
- System.out.println(msg);
- new Exception(msg).printStackTrace();
- System.exit(3);
- //throw new RuntimeException(msg);
- }
-
- /** See if this instances matches a given server and port. */
- public boolean matches(ConnectionBin[] bins, String protocol, String server, int port, PageCredentials authentication,
- String trustStoreString, String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
- {
- if (this.trustStoreString == null || trustStoreString == null)
- {
- if (this.trustStoreString != trustStoreString)
- return false;
- }
- else
- {
- if (!this.trustStoreString.equals(trustStoreString))
- return false;
- }
-
- if (this.authentication == null || authentication == null)
- {
- if (this.authentication != authentication)
- return false;
- }
- else
- {
- if (!this.authentication.equals(authentication))
- return false;
- }
-
- if (this.proxyHost == null || proxyHost == null)
- {
- if (this.proxyHost != proxyHost)
- return false;
- }
- else
- {
- if (!this.proxyHost.equals(proxyHost))
- return false;
- if (this.proxyAuthDomain == null || proxyAuthDomain == null)
- {
- if (this.proxyAuthDomain != proxyAuthDomain)
- return false;
- }
- else
- {
- if (!this.proxyAuthDomain.equals(proxyAuthDomain))
- return false;
- }
- if (this.proxyAuthUsername == null || proxyAuthUsername == null)
- {
- if (this.proxyAuthUsername != proxyAuthUsername)
- return false;
- }
- else
- {
- if (!this.proxyAuthUsername.equals(proxyAuthUsername))
- return false;
- }
- if (this.proxyAuthPassword == null || proxyAuthPassword == null)
- {
- if (this.proxyAuthPassword != proxyAuthPassword)
- return false;
- }
- else
- {
- if (!this.proxyAuthPassword.equals(proxyAuthPassword))
- return false;
- }
- }
-
- if (this.proxyPort != proxyPort)
- return false;
-
-
- if (this.connectionBinArray.length != bins.length || !this.protocol.equals(protocol) || !this.server.equals(server) || this.port != port)
- return false;
-
- int i = 0;
- while (i < bins.length)
- {
- if (connectionBinArray[i] != bins[i])
- return false;
- i++;
- }
- return true;
- }
+ protected String fetchType = null;
+ /** The current bytes in the current fetch */
+ protected long fetchCounter = 0L;
+ /** The start of the current fetch */
+ protected long startFetchTime = -1L;
+ /** The cookies from the last fetch */
+ protected LoginCookies lastFetchCookies = null;
+ /** Proxy host */
+ protected final String proxyHost;
+ /** Proxy port */
+ protected final int proxyPort;
+ /** Proxy auth domain */
+ protected final String proxyAuthDomain;
+ /** Proxy auth user name */
+ protected final String proxyAuthUsername;
+ /** Proxy auth password */
+ protected final String proxyAuthPassword;
+ /** Https protocol */
+ protected final javax.net.ssl.SSLSocketFactory httpsSocketFactory;
- /** Activate the connection. */
- public void activate()
- {
- isActive = true;
- int i = 0;
- while (i < connectionBinArray.length)
- {
- connectionBinArray[i++].takeFromPool(this);
- }
- }
+ /** The thread that is actually doing the work */
+ protected ExecuteMethodThread methodThread = null;
+ /** Set if thread has been started */
+ protected boolean threadStarted = false;
+
- /** Set up the connection. This allows us to feed all bins the correct bandwidth limit info.
- */
- public void setup(ThrottleDescription description)
+ /** Constructor. Create a connection with a specific server and port, and
+ * register it as active against all bins. */
+ public ThrottledConnection(ConnectionPool myPool, IFetchThrottler fetchThrottler,
+ String protocol, String server, int port, PageCredentials authentication,
+ javax.net.ssl.SSLSocketFactory httpsSocketFactory,
+ String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
{
- // Go through all bins, and set up the current limits.
- int i = 0;
- while (i < connectionBinArray.length)
- {
- String binName = connectionBinArray[i].getBinName();
- minMillisecondsPerByte[i] = description.getMinimumMillisecondsPerByte(binName);
- i++;
- }
+ this.myPool = myPool;
+ this.fetchThrottler = fetchThrottler;
+ this.proxyHost = proxyHost;
+ this.proxyPort = proxyPort;
+ this.proxyAuthDomain = proxyAuthDomain;
+ this.proxyAuthUsername = proxyAuthUsername;
+ this.proxyAuthPassword = proxyAuthPassword;
+ this.protocol = protocol;
+ this.server = server;
+ this.port = port;
+ this.authentication = authentication;
+ this.httpsSocketFactory = httpsSocketFactory;
}
- /** Do periodic bookkeeping.
- *@return true if the connection is no longer valid, and can be removed. */
- public boolean flushIdleConnections(long idleTimeout)
+ /** Check whether the connection has expired.
+ *@param currentTime is the current time to use to judge if a connection has expired.
+ *@return true if the connection has expired, and should be closed.
+ */
+ @Override
+ public boolean hasExpired(long currentTime)
{
- if (isActive)
- return false;
-
if (connManager != null)
{
connManager.closeIdleConnections(idleTimeout, TimeUnit.MILLISECONDS);
connManager.closeExpiredConnections();
- // Need to determine if there's a valid connection in the connection manager still, or if it is empty.
- //return connManager.getConnectionsInPool() == 0;
- return true;
}
- else
- return true;
+ return (currentTime > expireTime);
}
/** Log the fetch of a number of bytes, from within a stream. */
@@ -1187,65 +334,10 @@ public class ThrottledFetcher
fetchCounter += (long)count;
}
- /** Begin a read operation, from within a stream */
- public void beginRead(int len)
- throws InterruptedException
- {
- // Consult with throttle bins
- int lastOneDone = 0;
- try
- {
- for (int i = 0; i < throttleBinArray.length; i++)
- {
- throttleBinArray[i].beginRead(len,minMillisecondsPerByte[i]);
- lastOneDone = i + 1;
- }
- }
- finally
- {
- if (lastOneDone != throttleBinArray.length)
- {
- for (int i = 0; i < lastOneDone; i++)
- {
- throttleBinArray[i].abortRead();
- }
- }
- }
- }
-
- /** End a read operation, from within a stream */
- public void endRead(int origLen, int actualAmt)
- {
- // Consult with throttle bins
- Throwable e = null;
- for (int i = 0; i < throttleBinArray.length; i++)
- {
- try
- {
- throttleBinArray[i].endRead(origLen,actualAmt);
- }
- catch (Throwable e2)
- {
- e = e2;
- }
- }
- if (e != null)
- {
- if (e instanceof RuntimeException)
- throw (RuntimeException)e;
- else if (e instanceof Error)
- throw (Error)e;
- else
- throw new RuntimeException("Unknown exception: " + e.getMessage(),e);
- }
- }
-
/** Destroy the connection forever */
- protected void destroy()
+ @Override
+ public void destroy()
{
- if (isActive == false)
- throw new RuntimeException("Trying to destroy an inactive connection");
-
// Kill the actual connection object.
if (connManager != null)
{
@@ -1253,13 +345,6 @@ public class ThrottledFetcher
connManager = null;
}
- // Call all the bins this belongs to, and decrement the in-use count.
- int i = 0;
- while (i < connectionBinArray.length)
- {
- ConnectionBin cb = connectionBinArray[i++];
- cb.noteConnectionDestruction();
- }
}
@@ -1273,43 +358,15 @@ public class ThrottledFetcher
{
this.fetchType = fetchType;
this.fetchCounter = 0L;
- int lastCreated = 0;
try
{
- // Find or create the needed throttle bins
- for (int i = 0; i < throttleBinArray.length; i++)
- {
- // Access the bins as we need them, and drop them when ref count goes to zero
- String binName = connectionBinArray[i].getBinName();
- ThrottleBin tb;
- synchronized (throttleBins)
- {
- tb = throttleBins.get(binName);
- if (tb == null)
- {
- tb = new ThrottleBin(binName);
- throttleBins.put(binName,tb);
- }
- tb.beginFetch();
- }
- throttleBinArray[i] = tb;
- lastCreated = i + 1;
- }
+ if (fetchThrottler.obtainFetchDocumentPermission() == false)
+ throw new IllegalStateException("Unexpected return value from obtainFetchDocumentPermission()");
}
catch (InterruptedException e)
{
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
}
- finally
- {
- if (lastCreated != throttleBinArray.length)
- {
- for (int i = 0; i < lastCreated; i++)
- {
- throttleBinArray[i].abortFetch();
- }
- }
- }
}
/** Execute the fetch and get the return code. This method uses the
@@ -1600,7 +657,7 @@ public class ThrottledFetcher
//httpClient.setCookieStore(cookieStore);
// Create the thread
- methodThread = new ExecuteMethodThread(this, httpClient, fetchMethod, cookieStore);
+ methodThread = new ExecuteMethodThread(this, fetchThrottler, httpClient, fetchMethod, cookieStore);
try
{
methodThread.start();
@@ -1893,17 +950,6 @@ public class ThrottledFetcher
methodThread.abort();
long endTime = System.currentTimeMillis();
- int i = 0;
- while (i < throttleBinArray.length)
- {
- synchronized (throttleBins)
- {
- if (throttleBinArray[i].endFetch())
- throttleBins.remove(throttleBinArray[i].getBinName());
- }
- throttleBinArray[i] = null;
- i++;
- }
activities.recordActivity(new Long(startFetchTime),WebcrawlerConnector.ACTIVITY_FETCH,
new Long(fetchCounter),myUrl,Integer.toString(statusCode),(throwable==null)?null:throwable.getMessage(),null);
@@ -1945,44 +991,13 @@ public class ThrottledFetcher
}
- /** Close the connection. Call this to end this server connection.
+ /** Close the connection. Call this to return the connection to its pool.
*/
@Override
public void close()
- throws ManifoldCFException
{
- synchronized (poolLock)
- {
- // Verify that all the connections that exist are in fact sane
- synchronized (connectionBins)
- {
- for (String connectionName : connectionBins.keySet())
- {
- ConnectionBin cb = connectionBins.get(connectionName);
- //cb.sanityCheck();
- }
- }
-
- // Leave the connection alive, but mark it as inactive, and return it to the appropriate pools.
- isActive = false;
- inactiveTime = System.currentTimeMillis();
- int i = 0;
- while (i < connectionBinArray.length)
- {
- connectionBinArray[i++].addToPool(this);
- }
- // Verify that all the connections that exist are in fact sane
- synchronized (connectionBins)
- {
- for (String connectionName : connectionBins.keySet())
- {
- ConnectionBin cb = connectionBins.get(connectionName);
- //cb.sanityCheck();
- }
- }
- // Wake up everything waiting on the pool lock
- poolLock.notifyAll();
- }
+ expireTime = System.currentTimeMillis() + idleTimeout;
+ myPool.release(this);
}
protected void handleHTTPException(HttpException e, String activity)
@@ -2048,17 +1063,18 @@ public class ThrottledFetcher
*/
protected static class ThrottledInputstream extends InputStream
{
- /** Stream throttling parameters */
- protected double minimumMillisecondsPerBytePerServer;
+ /** Stream throttler */
+ protected final IStreamThrottler streamThrottler;
/** The throttled connection we belong to */
- protected ThrottledConnection throttledConnection;
+ protected final ThrottledConnection throttledConnection;
/** The stream we are wrapping. */
- protected InputStream inputStream;
+ protected final InputStream inputStream;
/** Constructor.
*/
- public ThrottledInputstream(ThrottledConnection connection, InputStream is)
+ public ThrottledInputstream(IStreamThrottler streamThrottler, ThrottledConnection connection, InputStream is)
{
+ this.streamThrottler = streamThrottler;
this.throttledConnection = connection;
this.inputStream = is;
}
@@ -2126,7 +1142,8 @@ public class ThrottledFetcher
{
try
{
- throttledConnection.beginRead(len);
+ if (streamThrottler.obtainReadPermission(len) == false)
+ throw new IllegalStateException("Unexpected result calling obtainReadPermission()");
int amt = 0;
try
{
@@ -2136,10 +1153,10 @@ public class ThrottledFetcher
finally
{
if (amt == -1)
- throttledConnection.endRead(len,0);
+ streamThrottler.releaseReadPermission(len,0);
else
{
- throttledConnection.endRead(len,amt);
+ streamThrottler.releaseReadPermission(len,amt);
throttledConnection.logFetchCount(amt);
}
}
@@ -2226,6 +1243,10 @@ public class ThrottledFetcher
{
Logging.connectors.debug("IO Exception trying to close connection: "+e.getMessage(),e);
}
+ finally
+ {
+ streamThrottler.closeStream();
+ }
}
}
@@ -2298,6 +1319,8 @@ public class ThrottledFetcher
{
/** The connection */
protected final ThrottledConnection theConnection;
+ /** The fetch throttler */
+ protected final IFetchThrottler fetchThrottler;
/** Client and method, all preconfigured */
protected final AbstractHttpClient httpClient;
protected final HttpRequestBase executeMethod;
@@ -2317,12 +1340,13 @@ public class ThrottledFetcher
protected Throwable generalException = null;
- public ExecuteMethodThread(ThrottledConnection theConnection,
+ public ExecuteMethodThread(ThrottledConnection theConnection, IFetchThrottler fetchThrottler,
AbstractHttpClient httpClient, HttpRequestBase executeMethod, CookieStore cookieStore)
{
super();
setDaemon(true);
this.theConnection = theConnection;
+ this.fetchThrottler = fetchThrottler;
this.httpClient = httpClient;
this.executeMethod = executeMethod;
this.cookieStore = cookieStore;
@@ -2419,7 +1443,7 @@ public class ThrottledFetcher
bodyStream = response.getEntity().getContent();
if (bodyStream != null)
{
- bodyStream = new ThrottledInputstream(theConnection,bodyStream);
+ bodyStream = new ThrottledInputstream(fetchThrottler.createFetchStream(),theConnection,bodyStream);
if (gzip)
bodyStream = new GZIPInputStream(bodyStream);
else if (deflate)
@@ -2741,4 +1765,252 @@ public class ThrottledFetcher
}
+ /** Connection pool key */
+ protected static class ConnectionPoolKey
+ {
+ protected final String protocol;
+ protected final String server;
+ protected final int port;
+ protected final PageCredentials authentication;
+ protected final String trustStoreString;
+ protected final String proxyHost;
+ protected final int proxyPort;
+ protected final String proxyAuthDomain;
+ protected final String proxyAuthUsername;
+ protected final String proxyAuthPassword;
+
+ public ConnectionPoolKey(String protocol,
+ String server, int port, PageCredentials authentication,
+ String trustStoreString, String proxyHost, int proxyPort,
+ String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
+ {
+ this.protocol = protocol;
+ this.server = server;
+ this.port = port;
+ this.authentication = authentication;
+ this.trustStoreString = trustStoreString;
+ this.proxyHost = proxyHost;
+ this.proxyPort = proxyPort;
+ this.proxyAuthDomain = proxyAuthDomain;
+ this.proxyAuthUsername = proxyAuthUsername;
+ this.proxyAuthPassword = proxyAuthPassword;
+ }
+
+ public int hashCode()
+ {
+ return protocol.hashCode() +
+ server.hashCode() +
+ (port * 31) +
+ ((authentication==null)?0:authentication.hashCode()) +
+ ((trustStoreString==null)?0:trustStoreString.hashCode()) +
+ ((proxyHost==null)?0:proxyHost.hashCode()) +
+ (proxyPort * 29) +
+ ((proxyAuthDomain==null)?0:proxyAuthDomain.hashCode()) +
+ ((proxyAuthUsername==null)?0:proxyAuthUsername.hashCode()) +
+ ((proxyAuthPassword==null)?0:proxyAuthPassword.hashCode());
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof ConnectionPoolKey))
+ return false;
+ ConnectionPoolKey other = (ConnectionPoolKey)o;
+ if (!server.equals(other.server) ||
+ port != other.port)
+ return false;
+ if (authentication == null || other.authentication == null)
+ {
+ if (authentication != other.authentication)
+ return false;
+ }
+ else
+ {
+ if (!authentication.equals(other.authentication))
+ return false;
+ }
+ if (trustStoreString == null || other.trustStoreString == null)
+ {
+ if (trustStoreString != other.trustStoreString)
+ return false;
+ }
+ else
+ {
+ if (!trustStoreString.equals(other.trustStoreString))
+ return false;
+ }
+ if (proxyHost == null || other.proxyHost == null)
+ {
+ if (proxyHost != other.proxyHost)
+ return false;
+ }
+ else
+ {
+ if (!proxyHost.equals(other.proxyHost))
+ return false;
+ }
+ if (proxyPort != other.proxyPort)
+ return false;
+ if (proxyAuthDomain == null || other.proxyAuthDomain == null)
+ {
+ if (proxyAuthDomain != other.proxyAuthDomain)
+ return false;
+ }
+ else
+ {
+ if (!proxyAuthDomain.equals(other.proxyAuthDomain))
+ return false;
+ }
+ if (proxyAuthUsername == null || other.proxyAuthUsername == null)
+ {
+ if (proxyAuthUsername != other.proxyAuthUsername)
+ return false;
+ }
+ else
+ {
+ if (!proxyAuthUsername.equals(other.proxyAuthUsername))
+ return false;
+ }
+ if (proxyAuthPassword == null || other.proxyAuthPassword == null)
+ {
+ if (proxyAuthPassword != other.proxyAuthPassword)
+ return false;
+ }
+ else
+ {
+ if (!proxyAuthPassword.equals(other.proxyAuthPassword))
+ return false;
+ }
+ return true;
+ }
+ }
+
+ /** Each connection pool has identical connections we can draw on.
+ */
+ protected static class ConnectionPool
+ {
+ /** Throttler */
+ protected final IConnectionThrottler connectionThrottler;
+
+ // If we need to create a connection, these are what we use
+
+ protected final String protocol;
+ protected final String server;
+ protected final int port;
+ protected final PageCredentials authentication;
+ protected final javax.net.ssl.SSLSocketFactory baseFactory;
+ protected final String proxyHost;
+ protected final int proxyPort;
+ protected final String proxyAuthDomain;
+ protected final String proxyAuthUsername;
+ protected final String proxyAuthPassword;
+
+ /** The actual pool of connections */
+ protected final List<IThrottledConnection> connections = new ArrayList<IThrottledConnection>();
+
+ public ConnectionPool(IConnectionThrottler connectionThrottler,
+ String protocol,
+ String server, int port, PageCredentials authentication,
+ javax.net.ssl.SSLSocketFactory baseFactory,
+ String proxyHost, int proxyPort,
+ String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
+ {
+ this.connectionThrottler = connectionThrottler;
+
+ this.protocol = protocol;
+ this.server = server;
+ this.port = port;
+ this.authentication = authentication;
+ this.baseFactory = baseFactory;
+ this.proxyHost = proxyHost;
+ this.proxyPort = proxyPort;
+ this.proxyAuthDomain = proxyAuthDomain;
+ this.proxyAuthUsername = proxyAuthUsername;
+ this.proxyAuthPassword = proxyAuthPassword;
+ }
+
+ public IThrottledConnection grab()
+ throws InterruptedException
+ {
+ // Wait for a connection
+ int result = connectionThrottler.waitConnectionAvailable();
+ if (result == IConnectionThrottler.CONNECTION_FROM_POOL)
+ {
+ // We are guaranteed to have a connection in the pool, unless there's a coding error.
+ synchronized (connections)
+ {
+ return connections.remove(connections.size()-1);
+ }
+ }
+ else if (result == IConnectionThrottler.CONNECTION_FROM_CREATION)
+ {
+ return new ThrottledConnection(this,connectionThrottler.getNewConnectionFetchThrottler(),
+ protocol,server,port,authentication,baseFactory,
+ proxyHost,proxyPort,
+ proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+ }
+ else
+ throw new IllegalStateException("Unexpected return value from waitConnectionAvailable(): "+result);
+ }
+
+ public void release(IThrottledConnection connection)
+ {
+ if (connectionThrottler.noteReturnedConnection())
+ {
+ // Destroy this connection
+ connection.destroy();
+ connectionThrottler.noteConnectionDestroyed();
+ }
+ else
+ {
+ // Return to pool
+ synchronized (connections)
+ {
+ connections.add(connection);
+ }
+ connectionThrottler.noteConnectionReturnedToPool();
+ }
+ }
+
+ public void flushIdleConnections()
+ {
+ long currentTime = System.currentTimeMillis();
+ // First, remove connections that are over the quota
+ while (connectionThrottler.checkDestroyPooledConnection())
+ {
+ // Destroy the oldest ones first
+ IThrottledConnection connection;
+ synchronized (connections)
+ {
+ connection = connections.remove(0);
+ }
+ connection.close();
+ connectionThrottler.noteConnectionDestroyed();
+ }
+ // Now, get rid of expired connections
+ while (true)
+ {
+ boolean expired;
+ synchronized (connections)
+ {
+ expired = connections.size() > 0 && connections.get(0).hasExpired(currentTime);
+ }
+ if (!expired)
+ break;
+ // We found an expired connection! Now tell the throttler that, and see if it agrees.
+ if (connectionThrottler.checkExpireConnection())
+ {
+ // Remove a connection from the pool, and destroy it.
+ // It's not guaranteed to be an expired one, but that's a rare occurrence, we expect.
+ IThrottledConnection connection;
+ synchronized (connections)
+ {
+ connection = connections.remove(0);
+ }
+ connection.destroy();
+ connectionThrottler.noteConnectionDestroyed();
+ }
+ }
+ }
+
+ }
}
Modified: manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java (original)
+++ manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java Tue Dec 17 21:12:42 2013
@@ -160,6 +160,8 @@ public class WebcrawlerConnector extends
protected int connectionTimeoutMilliseconds = 60000;
/** Socket timeout, milliseconds */
protected int socketTimeoutMilliseconds = 300000;
+ /** Throttle group name */
+ protected String throttleGroupName = null;
// Canonicalization enabling/disabling. Eventually this will probably need to be by regular expression.
@@ -354,6 +356,9 @@ public class WebcrawlerConnector extends
{
String x;
+ // Either set this from the connection name, or just have one. Right now, we have one.
+ String throttleGroupName = "";
+
String emailAddress = params.getParameter(WebcrawlerConfig.PARAMETER_EMAIL);
if (emailAddress == null)
throw new ManifoldCFException("Missing email address");
@@ -406,7 +411,7 @@ public class WebcrawlerConnector extends
public void poll()
throws ManifoldCFException
{
- ThrottledFetcher.flushIdleConnections();
+ ThrottledFetcher.flushIdleConnections(currentContext);
}
/** Check status of connection.
@@ -425,6 +430,7 @@ public class WebcrawlerConnector extends
public void disconnect()
throws ManifoldCFException
{
+ throttleGroupName = null;
throttleDescription = null;
credentialsDescription = null;
trustsDescription = null;
@@ -711,7 +717,9 @@ public class WebcrawlerConnector extends
// Prepare to perform the fetch, and decide what to do with the document.
//
- IThrottledConnection connection = ThrottledFetcher.getConnection(protocol,ipAddress,port,
+ IThrottledConnection connection = ThrottledFetcher.getConnection(currentContext,
+ throttleGroupName,
+ protocol,ipAddress,port,
credential,trustStore,throttleDescription,binNames,connectionLimit,
proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
try
@@ -5126,7 +5134,8 @@ public class WebcrawlerConnector extends
// We've successfully obtained a lock on reading robots for this server! Now, guarantee that we'll free it, by instantiating a try/finally
try
{
- IThrottledConnection connection = ThrottledFetcher.getConnection(protocol,hostIPAddress,port,credential,
+ IThrottledConnection connection = ThrottledFetcher.getConnection(currentContext,throttleGroupName,
+ protocol,hostIPAddress,port,credential,
trustStore,throttleDescription,binNames,connectionLimit,
proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
try
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java Tue Dec 17 21:12:42 2013
@@ -55,6 +55,8 @@ public class IdleCleanupThread extends T
ICacheManager cacheManager = CacheManagerFactory.make(threadContext);
// Get the output connector pool handle
IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
+ // Throttler subsystem
+ IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
/* For HSQLDB debugging...
IDBInterface database = DBInterfaceFactory.make(threadContext,
@@ -88,6 +90,9 @@ public class IdleCleanupThread extends T
// Do the cleanup
outputConnectorPool.pollAllConnectors();
+ // Poll connection bins
+ throttleGroups.poll();
+ // Expire objects
cacheManager.expireObjects(System.currentTimeMillis());
// Sleep for the retry interval.
Modified: manifoldcf/trunk/framework/build.xml
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/build.xml?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/framework/build.xml (original)
+++ manifoldcf/trunk/framework/build.xml Tue Dec 17 21:12:42 2013
@@ -1482,6 +1482,7 @@
<test name="org.apache.manifoldcf.core.common.DateTest" todir="test-output"/>
<test name="org.apache.manifoldcf.core.fuzzyml.TestFuzzyML" todir="test-output"/>
<test name="org.apache.manifoldcf.core.lockmanager.TestZooKeeperLocks" todir="test-output"/>
+ <test name="org.apache.manifoldcf.core.throttler.TestThrottler" todir="test-output"/>
</junit>
</target>
Modified: manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java (original)
+++ manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java Tue Dec 17 21:12:42 2013
@@ -253,6 +253,8 @@ public class ManifoldCF
masterDatabaseUsername = LockManagerFactory.getStringProperty(threadContext,masterDatabaseUsernameProperty,"manifoldcf");
masterDatabasePassword = LockManagerFactory.getStringProperty(threadContext,masterDatabasePasswordProperty,"local_pg_passwd");
+ // Register the throttler for cleanup on shutdown
+ addShutdownHook(new ThrottlerShutdown());
// Register the file tracker for cleanup on shutdown
tracker = new FileTrack();
addShutdownHook(tracker);
@@ -1383,6 +1385,38 @@ public class ManifoldCF
}
+ /** Class that cleans up throttler on exit */
+ protected static class ThrottlerShutdown implements IShutdownHook
+ {
+ public ThrottlerShutdown()
+ {
+ }
+
+ @Override
+ public void doCleanup(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ IThrottleGroups connectionThrottler = ThrottleGroupsFactory.make(threadContext);
+ connectionThrottler.destroy();
+ }
+
+ /** Finalizer, which is designed to catch class unloading that tomcat 5.5 does.
+ */
+ protected void finalize()
+ throws Throwable
+ {
+ try
+ {
+ doCleanup(ThreadContextFactory.make());
+ }
+ finally
+ {
+ super.finalize();
+ }
+ }
+
+ }
+
/** Class that cleans up database handles on exit */
protected static class DatabaseShutdown implements IShutdownHook
{