You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/17 22:12:43 UTC

svn commit: r1551707 [2/2] - in /manifoldcf/trunk: ./ connectors/rss/ connectors/rss/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/rss/ connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler...

Modified: manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java (original)
+++ manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/ThrottledFetcher.java Tue Dec 17 21:12:42 2013
@@ -100,6 +100,12 @@ public class ThrottledFetcher
 {
   public static final String _rcsid = "@(#)$Id: ThrottledFetcher.java 989847 2010-08-26 17:52:30Z kwright $";
 
+  /** Web throttle group type */
+  protected static final String webThrottleGroupType = "_WEB_";
+  
+  /** Idle timeout */
+  protected static final long idleTimeout = 300000L;
+  
   /** This flag determines whether we record everything to the disk, as a means of doing a web snapshot */
   protected static final boolean recordEverything = false;
 
@@ -109,18 +115,13 @@ public class ThrottledFetcher
   protected static final long TIME_6HRS = 6L * 60L * 60000L;
   protected static final long TIME_1DAY = 24L * 60L * 60000L;
 
+  /** The read chunk length */
+  protected static final int READ_CHUNK_LENGTH = 4096;
 
-  // The following static bin pools correspond to global resources that will be managed via ILockManager.
+  /** Connection pools.
+  /* This is a static hash of the connection pools in existence.  Each connection pool represents a set of identical connections. */
+  protected final static Map<ConnectionPoolKey,ConnectionPool> connectionPools = new HashMap<ConnectionPoolKey,ConnectionPool>();
   
-  /** This is the static pool of ConnectionBin's, keyed by bin name. */
-  protected static Map<String,ConnectionBin> connectionBins = new HashMap<String,ConnectionBin>();
-  /** This is the static pool of ThrottleBin's, keyed by bin name. */
-  protected static Map<String,ThrottleBin> throttleBins = new HashMap<String,ThrottleBin>();
-
-  /** This global lock protects the "distributed pool" resource, and insures that a connection
-  * can get pulled out of all the right pools and wind up in only the hands of one thread. */
-  protected static Integer poolLock = new Integer(0);
-
   /** Current host name */
   private static String currentHost = null;
   static
@@ -138,17 +139,13 @@ public class ThrottledFetcher
     }
   }
 
-  /** The read chunk length */
-  protected static final int READ_CHUNK_LENGTH = 4096;
-
-  /** Constructor.
+  /** Constructor.  Private since we never instantiate.
   */
-  public ThrottledFetcher()
+  private ThrottledFetcher()
   {
   }
 
 
-
   /** Obtain a connection to specified protocol, server, and port.  We use the protocol because the
   * setup for some protocols is extensive (e.g. https) and hopefully would not need to be repeated if
   * we distinguish connections based on that.
@@ -164,15 +161,22 @@ public class ThrottledFetcher
   *@param connectionLimit isthe maximum number of connections permitted.
   *@return an IThrottledConnection object that can be used to fetch from the port.
   */
-  public static IThrottledConnection getConnection(String protocol, String server, int port,
+  public static IThrottledConnection getConnection(IThreadContext threadContext, String throttleGroupName,
+    String protocol, String server, int port,
     PageCredentials authentication,
     IKeystoreManager trustStore,
-    ThrottleDescription throttleDescription, String[] binNames,
+    IThrottleSpec throttleDescription, String[] binNames,
     int connectionLimit,
     String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
     throws ManifoldCFException
   {
-    // Create the https scheme for this connection
+    // Get a throttle groups handle
+    IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
+    
+    // Create the appropruate throttle group, or update the throttle description for an existing one
+    throttleGroups.createOrUpdateThrottleGroup(webThrottleGroupType,throttleGroupName,throttleDescription);
+    
+    // Create the https scheme and trust store string for this connection
     javax.net.ssl.SSLSocketFactory baseFactory;
     String trustStoreString;
     if (trustStore != null)
@@ -186,781 +190,68 @@ public class ThrottledFetcher
       trustStoreString = null;
     }
 
-
-    ConnectionBin[] bins = new ConnectionBin[binNames.length];
-
-    // Now, start looking for a connection
-    int i = 0;
-    while (i < binNames.length)
+    // Construct a connection pool key
+    ConnectionPoolKey poolKey = new ConnectionPoolKey(protocol,server,port,authentication,
+      trustStoreString,proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+    
+    ConnectionPool p;
+    synchronized (connectionPools)
     {
-      String binName = binNames[i];
-
-      // Find or create the bin object
-      ConnectionBin cb;
-      synchronized (connectionBins)
+      p = connectionPools.get(poolKey);
+      if (p == null)
       {
-        cb = connectionBins.get(binName);
-        if (cb == null)
-        {
-          cb = new ConnectionBin(binName);
-          connectionBins.put(binName,cb);
-        }
-        //cb.sanityCheck();
+        // Construct a new IConnectionThrottler.
+        IConnectionThrottler connectionThrottler =
+          throttleGroups.obtainConnectionThrottler(webThrottleGroupType,throttleGroupName,binNames);
+        p = new ConnectionPool(connectionThrottler,protocol,server,port,authentication,baseFactory,
+          proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+        connectionPools.put(poolKey,p);
       }
-      bins[i] = cb;
-      i++;
     }
-
-    ThrottledConnection connectionToReuse;
-
-    long startTime = 0L;
-    if (Logging.connectors.isDebugEnabled())
+    
+    try
     {
-      startTime = System.currentTimeMillis();
-      Logging.connectors.debug("WEB: Waiting to start getting a connection to "+protocol+"://"+server+":"+port);
+      return p.grab();
     }
-
-    synchronized (poolLock)
+    catch (InterruptedException e)
     {
-
-      // If the number of outstanding connections is greater than the global limit, close pooled connections until we are under the limit
-      long idleTimeout = 64000L;
-      while (true)
-      {
-        int openCount = 0;
-
-        // Lock up everything for a moment
-        synchronized (connectionBins)
-        {
-          // Time out connections that have been idle too long.  To do this, we need to go through
-          // all connection bins and look at the pool
-          for (String binName : connectionBins.keySet())
-          {
-            ConnectionBin cb = connectionBins.get(binName);
-            openCount += cb.countConnections();
-          }
-        }
-
-        if (openCount < connectionLimit)
-          break;
-
-        if (idleTimeout == 0L)
-        {
-          // Can't actually conclude anything here unfortunately
-
-          // Logging.connectors.warn("Web: Exceeding connection limit!  Open count = "+Integer.toString(openCount)+"; limit = "+Integer.toString(connectionLimit));
-          break;
-        }
-        idleTimeout = idleTimeout/4L;
-
-        // Lock up everything for a moment, since otherwise we could delete something people
-        // expect to stick around.
-        synchronized (connectionBins)
-        {
-          // Time out connections that have been idle too long.  To do this, we need to go through
-          // all connection bins and look at the pool
-          for (String binName : connectionBins.keySet())
-          {
-            ConnectionBin cb = connectionBins.get(binName);
-            cb.flushIdleConnections(idleTimeout);
-          }
-        }
-      }
-
-      try
-      {
-        // Retry until we get the connection.
-        while (true)
-        {
-          if (Logging.connectors.isDebugEnabled())
-            Logging.connectors.debug("WEB: Attempting to get connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
-          i = 0;
-
-          connectionToReuse = null;
-
-          try
-          {
-
-            // Now, start looking for a connection
-            while (i < binNames.length)
-            {
-              String binName = binNames[i];
-              ConnectionBin cb = bins[i];
-
-              // Figure out the connection limit for this bin, based on the throttle description
-              int maxConnections = throttleDescription.getMaxOpenConnections(binName);
-
-              // If no restriction, use a very large value.
-              if (maxConnections == -1)
-                maxConnections = Integer.MAX_VALUE;
-              else if (maxConnections == 0)
-                maxConnections = 1;
-
-              // Now, do what we need to do to reserve our connection for this bin.
-              // If we can't reserve it now, we plan on undoing everything we did, so
-              // whatever we do must be reversible.  Furthermore, nothing we call here
-              // should actually wait(); that will occur if we can't get what we need out
-              // here at this level.
-
-              if (connectionToReuse != null)
-              {
-                // We have a reuse candidate already, so just make sure each remaining bin is within
-                // its limits.
-                cb.insureWithinLimits(maxConnections,connectionToReuse);
-              }
-              else
-              {
-                connectionToReuse = cb.findConnection(maxConnections,bins,protocol,server,port,authentication,trustStoreString,
-                  proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
-              }
-
-              // Increment after we successfully handled this bin
-              i++;
-            }
-
-            // That loop completed, meaning that we think we got a connection.  Now, go through all the bins and make sure there's enough time since the last
-            // fetch.  If not, we have to clean everything up and try again.
-            long currentTime = System.currentTimeMillis();
-
-            // Global lock needed to insure that fetch time is updated across all bins simultaneously
-            synchronized (connectionBins)
-            {
-              i = 0;
-              while (i < binNames.length)
-              {
-                String binName = binNames[i];
-                ConnectionBin cb = bins[i];
-                //cb.sanityCheck();
-                // Get the minimum time between fetches for this bin, based on the throttle description
-                long minMillisecondsPerFetch = throttleDescription.getMinimumMillisecondsPerFetch(binName);
-                if (cb.getLastFetchTime() + minMillisecondsPerFetch > currentTime)
-                  throw new WaitException(cb.getLastFetchTime() + minMillisecondsPerFetch - currentTime);
-                i++;
-              }
-              i = 0;
-              while (i < binNames.length)
-              {
-                ConnectionBin cb = bins[i++];
-                cb.setLastFetchTime(currentTime);
-              }
-            }
-
-          }
-          catch (Throwable e)
-          {
-            // We have to free everything and retry, because otherwise we are subject to deadlock.
-            // The only thing we have reserved is the connection, which we must free if there's a
-            // problem.
-
-            if (connectionToReuse != null)
-            {
-              // Return this connection to the pool.  That is, the pools for all the bins.
-              int k = 0;
-              while (k < binNames.length)
-              {
-                String binName = binNames[k++];
-                ConnectionBin cb;
-                synchronized (connectionBins)
-                {
-                  cb = connectionBins.get(binName);
-                  if (cb == null)
-                  {
-                    cb = new ConnectionBin(binName);
-                    connectionBins.put(binName,cb);
-                  }
-                }
-                //cb.sanityCheck();
-                cb.addToPool(connectionToReuse);
-                //cb.sanityCheck();
-              }
-              connectionToReuse = null;
-              // We should not need to notify here because nothing has really changed from
-              // when the attempt started to get the connection.  We just undid what we did.
-            }
-
-
-            if (e instanceof Error)
-              throw (Error)e;
-            if (e instanceof ManifoldCFException)
-              throw (ManifoldCFException)e;
-
-            if (e instanceof WaitException)
-            {
-              // Wait because we need a certain amount of time after a previous fetch.
-              WaitException we = (WaitException)e;
-              long waitAmount = we.getWaitAmount();
-              if (Logging.connectors.isDebugEnabled())
-                Logging.connectors.debug("WEB: Waiting "+new Long(waitAmount).toString()+" ms before starting fetch on "+protocol+"://"+server+":"+port);
-              // Really don't want to sleep inside the pool lock!
-              // The easiest thing to do instead is to use a timed wait.  There is no reason why we need
-              // to wake before the wait time is exceeded - but it's harmless, and the alternative is to
-              // do more reorganization than probably is wise.
-              poolLock.wait(waitAmount);
-              continue;
-            }
-
-            if (e instanceof PoolException)
-            {
-
-              if (Logging.connectors.isDebugEnabled())
-                Logging.connectors.debug("WEB: Going into wait for connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
-              // Now, wait for something external to change.  The only thing that can help us is if
-              // some other thread frees a connection.
-              poolLock.wait();
-              // Go back around and try again.
-              continue;
-            }
-
-            throw new ManifoldCFException("Unexpected exception encountered: "+e.getMessage(),e);
-          }
-
-          if (Logging.connectors.isDebugEnabled())
-            Logging.connectors.debug("WEB: Successfully got connection to "+protocol+"://"+server+":"+port+" ("+new Long(System.currentTimeMillis()-startTime).toString()+" ms)");
-
-          // If we have a connection located, activate it.
-          if (connectionToReuse == null)
-            connectionToReuse = new ThrottledConnection(protocol,server,port,authentication,baseFactory,trustStoreString,bins,
-              proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
-          connectionToReuse.setup(throttleDescription);
-          return connectionToReuse;
-        }
-      }
-      catch (InterruptedException e)
-      {
-        throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
-      }
+      throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
     }
   }
 
-
   /** Flush connections that have timed out from inactivity. */
-  public static void flushIdleConnections()
+  public static void flushIdleConnections(IThreadContext threadContext)
     throws ManifoldCFException
   {
-    synchronized (poolLock)
-    {
-      // Lock up everything for a moment, since otherwise we could delete something people
-      // expect to stick around.
-      synchronized (connectionBins)
-      {
-        // Time out connections that have been idle too long.  To do this, we need to go through
-        // all connection bins and look at the pool
-        for (String binName : connectionBins.keySet())
-        {
-          ConnectionBin cb = connectionBins.get(binName);
-          if (cb.flushIdleConnections(60000L))
-          {
-            // Bin is no longer doing anything; get rid of it.
-            // I've determined this is safe - inUseConnections is designed to prevent any active connection from getting
-            // whacked.
-            // Oops.  Hang results again when I enabled this, so out it goes again.
-            //connectionBins.remove(binName);
-            //binIter = connectionBins.keySet().iterator();
-          }
-        }
-      }
-    }
-  }
-
-  /** Connection pool for a bin.
-  * An instance of this class tracks the connections that are pooled and that are in use for a specific bin.
-  * NOTE WELL: This resource must be constrained globally, across all JVMs!
-  * To do that, we need an ILockManager to handle the global data for each bin.
-  */
-  protected static class ConnectionBin
-  {
-    /** This is the bin name which this connection pool belongs to */
-    protected String binName;
-    /** This is the number of connections in this bin that are signed out and presumably in use */
-    protected int inUseConnections = 0;
-    /** This is the last time a fetch was done on this bin */
-    protected long lastFetchTime = 0L;
-    /** This object is what we synchronize on when we are waiting on a connection to free up for this
-    * bin.  This is a separate object, because we also want to protect the integrity of the
-    * ConnectionBin object itself, for which we'll use the ConnectionBin's synchronizer. */
-    protected Integer connectionWait = new Integer(0);
-    /** This map contains ThrottledConnection objects that are in the pool, and are not in use. */
-    protected HashMap freePool = new HashMap();
-
-    /** Constructor. */
-    public ConnectionBin(String binName)
-    {
-      this.binName = binName;
-    }
-
-    /** Get the bin name. */
-    public String getBinName()
-    {
-      return binName;
-    }
-
-    /** Note the creation of an active connection that belongs to this bin.  The slots all must
-    * have been reserved prior to the connection being created.
-    */
-    public synchronized void noteConnectionCreation()
-    {
-      inUseConnections++;
-    }
-
-    /** Note the destruction of an active connection that belongs to this bin.
-    */
-    public synchronized void noteConnectionDestruction()
-    {
-      inUseConnections--;
-    }
-
-
-    /** Activate a connection that should be in the pool.
-    * Removes the connection from the pool.
-    */
-    public synchronized void takeFromPool(ThrottledConnection tc)
-    {
-      // Remove this connection from the pool list
-      freePool.remove(tc);
-      inUseConnections++;
-    }
-
-    /** Put a connection into the pool.
-    */
-    public synchronized void addToPool(ThrottledConnection tc)
-    {
-      // Add this connection to the pool list
-      freePool.put(tc,tc);
-      inUseConnections--;
-    }
-
-    /** Verify that this bin is within limits.
-    */
-    public synchronized void insureWithinLimits(int maxConnections, ThrottledConnection existingConnection)
-      throws PoolException
-    {
-      //sanityCheck();
-
-      // See if the connection is in fact within the pool; if so, we just presume the limits are fine as they are.
-      // This is necessary because if the connection that's being checked for is freed, then we wreck the data structures.
-      if (existsInPool(existingConnection))
-        return;
-
-      while (maxConnections > 0 && inUseConnections + freePool.size() > maxConnections)
-      {
-        //sanityCheck();
-
-        // If there are any pool connections, free them one at a time
-        ThrottledConnection freeMe = getPoolConnection();
-        if (freeMe != null)
-        {
-          // It's okay to call activate since we guarantee that only one thread is trying to grab
-          // a connection at a time.
-          freeMe.activate();
-          freeMe.destroy();
-          continue;
-        }
-
-        // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
-        throw new PoolException("Waiting for a connection");
-      }
-    }
-
-    /** This method is called only when there is no existing connection yet identified that can be used
-    * for contacting the server and port specified.  This method returns a connection if a matching one can be found;
-    * otherwise it returns null.
-    * If a matching connection is found, it is activated before it is returned.  That removes the connection from all
-    * pools in which it lives.
-    */
-    public synchronized ThrottledConnection findConnection(int maxConnections,
-      ConnectionBin[] binNames, String protocol, String server, int port,
-      PageCredentials authentication, String trustStoreString,
-      String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
-      throws PoolException
-    {
-      //sanityCheck();
-
-      // First, wait until there's no excess.
-      while (maxConnections > 0 && inUseConnections + freePool.size() > maxConnections)
-      {
-        //sanityCheck();
-        // If there are any pool connections, free them one at a time
-        ThrottledConnection freeMe = getPoolConnection();
-        if (freeMe != null)
-        {
-          // It's okay to call activate since we guarantee that only one thread is trying to grab
-          // a connection at a time.
-          freeMe.activate();
-          freeMe.destroy();
-          continue;
-        }
-
-        // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
-        throw new PoolException("Waiting for a connection");
-
-      }
-
-      // Wait until there's a free one
-      if (maxConnections > 0 && inUseConnections > maxConnections-1)
-      {
-        // Instead of waiting, throw a pool exception, so that we can wait and retry at the next level up.
-        throw new PoolException("Waiting for a connection");
-      }
-
-      // A null return means that there is no existing pooled connection that matches, and the  caller is free to create a new connection
-      ThrottledConnection rval = getPoolConnection();
-      if (rval == null)
-        return null;
-
-      // It's okay to call activate since we guarantee that only one thread is trying to grab
-      // a connection at a time.
-      rval.activate();
-      //sanityCheck();
-
-      if (!rval.matches(binNames,protocol,server,port,authentication,trustStoreString,
-        proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword))
-      {
-        // Destroy old connection.  That should free up space for a new creation.
-        rval.destroy();
-        // Return null to indicate that we can create a new connection now
-        return null;
-      }
-
-      // Existing entry matched.  Activate and return it.
-      return rval;
-    }
-
-    /** Note a new time for connection fetch for this pool.
-    *@param currentTime is the time the fetch was started.
-    */
-    public synchronized void setLastFetchTime(long currentTime)
-    {
-      if (currentTime > lastFetchTime)
-        lastFetchTime = currentTime;
-    }
-
-    /** Get the last fetch time.
-    *@return the time.
-    */
-    public synchronized long getLastFetchTime()
-    {
-      return lastFetchTime;
-    }
-
-    /** Count connections that are in use.
-    *@return connections that are in use.
-    */
-    public synchronized int countConnections()
-    {
-      return freePool.size() + inUseConnections;
-    }
-
-    /** Flush any idle connections.
-    *@return true if the connection bin is now, in fact, empty.
-    */
-    public synchronized boolean flushIdleConnections(long idleTimeout)
-    {
-      //sanityCheck();
-
-      // We have to time out the pool connections.  When there are no pool connections
-      // left, AND the in-use counts are zero, we can delete the whole thing.
-      Iterator iter = freePool.keySet().iterator();
-      while (iter.hasNext())
-      {
-        ThrottledConnection tc = (ThrottledConnection)iter.next();
-        if (tc.flushIdleConnections(idleTimeout))
-        {
-          // Can delete this connection, since it timed out.
-          tc.activate();
-          tc.destroy();
-          iter = freePool.keySet().iterator();
-        }
-      }
-
-      //sanityCheck();
-
-      return (freePool.size() == 0 && inUseConnections == 0);
-    }
-
-    /** Grab a connection from the current pool.  This does not remove the connection from the pool;
-    * it just sets it up so that later methods can do that.
-    */
-    protected ThrottledConnection getPoolConnection()
-    {
-      if (freePool.size() == 0)
-        return null;
-      Iterator iter = freePool.keySet().iterator();
-      ThrottledConnection rval = (ThrottledConnection)iter.next();
-      return rval;
-    }
-
-    /** Check if a connection exists in the pool already.
-    */
-    protected boolean existsInPool(ThrottledConnection tc)
-    {
-      return freePool.get(tc) != null;
-    }
-
-    public synchronized void sanityCheck()
-    {
-      // Make sure all the connections in the current pool in fact have a reference to this bin.
-      Iterator iter = freePool.keySet().iterator();
-      while (iter.hasNext())
-      {
-        ThrottledConnection tc = (ThrottledConnection)iter.next();
-        tc.mustHaveReference(this);
-      }
-    }
-
-  }
-
-  /** Throttles for a bin.
-  * An instance of this class keeps track of the information needed to bandwidth throttle access
-  * to a url belonging to a specific bin.
-  *
-  * In order to calculate
-  * the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
-  * For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
-  * window length is too long.
-  *
-  * One solution to this problem would be to keep a list of the individual fetches as records.  Then, we could
-  * "expire" a fetch by discarding the old record.  However, this is quite memory consumptive for all but the
-  * smallest intervals.
-  *
-  * Another, better, solution is to hook into the start and end of individual fetches.  These will, presumably, occur
-  * at the fastest possible rate without long pauses spent doing something else.  The only complication is that
-  * fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
-  * For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
-  * than keep records around.  The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
-  * acceptable.
-  *
-  * Some notes on the algorithms used to limit server bandwidth impact
-  * ==================================================================
-  *
-  * In a single connection case, the algorithm we'd want to use works like this.  On the first chunk of a series,
-  * the total length of time and the number of bytes are recorded.  Then, prior to each subsequent chunk, a calculation
-  * is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
-  * access as a way of estimating how long it will take to fetch those next n bytes.
-  *
-  * For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
-  * and harder still to "hit the target", because simultaneous fetches will intrude.  The strategy is therefore:
-  *
-  * 1) The first chunk of any series should proceed without interference from other connections to the same server.
-  *    The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
-  *
-  * 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection".  That is, if other
-  *    connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
-  *    took.  Thus, by generating end-time estimates based on this number, we are actually being conservative and
-  *    using less server bandwidth.
-  *
-  * 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
-  *    new chunks from other connections can start.
-  *
-  * NOTE WELL: This resource must be constrained globally, across all JVMs!
-  * To do that, we need an ILockManager to handle the global data for each bin.
-  */
-  protected static class ThrottleBin
-  {
-    /** This is the bin name which this throttle belongs to. */
-    protected final String binName;
-    /** This is the reference count for this bin (which records active references) */
-    protected volatile int refCount = 0;
-    /** The inverse rate estimate of the first fetch, in ms/byte */
-    protected double rateEstimate = 0.0;
-    /** Flag indicating whether a rate estimate is needed */
-    protected volatile boolean estimateValid = false;
-    /** Flag indicating whether rate estimation is in progress yet */
-    protected volatile boolean estimateInProgress = false;
-    /** The start time of this series */
-    protected long seriesStartTime = -1L;
-    /** Total actual bytes read in this series; this includes fetches in progress */
-    protected long totalBytesRead = -1L;
-
-    /** Constructor. */
-    public ThrottleBin(String binName)
-    {
-      this.binName = binName;
-    }
-
-    /** Get the bin name. */
-    public String getBinName()
-    {
-      return binName;
-    }
-
-    /** Note the start of a fetch operation for a bin.  Call this method just before the actual stream access begins.
-    * May wait until schedule allows.
-    */
-    public void beginFetch()
-      throws InterruptedException
+    // Go through outstanding connection pools and clean them up.
+    synchronized (connectionPools)
     {
-      synchronized (this)
-      {
-        if (refCount == 0)
-        {
-          // Now, reset bandwidth throttling counters
-          estimateValid = false;
-          rateEstimate = 0.0;
-          totalBytesRead = 0L;
-          estimateInProgress = false;
-          seriesStartTime = -1L;
-        }
-        refCount++;
-      }
-
-    }
-
-    /** Abort the fetch.
-    */
-    public void abortFetch()
-    {
-      synchronized (this)
+      for (ConnectionPool pool : connectionPools.values())
       {
-        refCount--;
+        pool.flushIdleConnections();
       }
     }
-    
-    /** Note the start of an individual byte read of a specified size.  Call this method just before the
-    * read request takes place.  Performs the necessary delay prior to reading specified number of bytes from the server.
-    */
-    public void beginRead(int byteCount, double minimumMillisecondsPerBytePerServer)
-      throws InterruptedException
-    {
-      long currentTime = System.currentTimeMillis();
-
-      synchronized (this)
-      {
-        while (estimateInProgress)
-          wait();
-        if (estimateValid == false)
-        {
-          seriesStartTime = currentTime;
-          estimateInProgress = true;
-          // Add these bytes to the estimated total
-          totalBytesRead += (long)byteCount;
-          // Exit early; this thread isn't going to do any waiting
-          return;
-        }
-      }
-
-      // It is possible for the following code to get interrupted.  If that happens,
-      // we have to unstick the threads that are waiting on the estimate!
-      boolean finished = false;
-      try
-      {
-        long waitTime = 0L;
-        synchronized (this)
-        {
-          // Add these bytes to the estimated total
-          totalBytesRead += (long)byteCount;
-
-          // Estimate the time this read will take, and wait accordingly
-          long estimatedTime = (long)(rateEstimate * (double)byteCount);
-
-          // Figure out how long the total byte count should take, to meet the constraint
-          long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
-
-          // The wait time is the different between our desired end time, minus the estimated time to read the data, and the
-          // current time.  But it can't be negative.
-          waitTime = (desiredEndTime - estimatedTime) - currentTime;
-        }
-
-        if (waitTime > 0L)
-        {
-          if (Logging.connectors.isDebugEnabled())
-            Logging.connectors.debug("WEB: Performing a read wait on bin '"+binName+"' of "+
-            new Long(waitTime).toString()+" ms.");
-          ManifoldCF.sleep(waitTime);
-        }
-        finished = true;
-      }
-      finally
-      {
-        if (!finished)
-        {
-          abortRead();
-        }
-      }
-    }
-
-    /** Abort a read in progress.
-    */
-    public void abortRead()
-    {
-      synchronized (this)
-      {
-        if (estimateInProgress)
-        {
-          estimateInProgress = false;
-          notifyAll();
-        }
-      }
-    }
-    
-    /** Note the end of an individual read from the server.  Call this just after an individual read completes.
-    * Pass the actual number of bytes read to the method.
-    */
-    public void endRead(int originalCount, int actualCount)
-    {
-      long currentTime = System.currentTimeMillis();
-
-      synchronized (this)
-      {
-        totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
-        if (estimateInProgress)
-        {
-          if (actualCount == 0)
-            // Didn't actually get any bytes, so use 0.0
-            rateEstimate = 0.0;
-          else
-            rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
-          estimateValid = true;
-          estimateInProgress = false;
-          notifyAll();
-        }
-      }
-    }
-
-    /** Note the end of a fetch operation.  Call this method just after the fetch completes.
-    */
-    public boolean endFetch()
-    {
-      synchronized (this)
-      {
-        refCount--;
-        return (refCount == 0);
-      }
-
-    }
-
   }
 
   /** Throttled connections.  Each instance of a connection describes the bins to which it belongs,
   * along with the actual open connection itself, and the last time the connection was used. */
   protected static class ThrottledConnection implements IThrottledConnection
   {
-    /** The connection has resolved pointers to the ConnectionBin structures that manage pool
-    * maximums.  These are ONLY valid when the connection is actually in the pool. */
-    protected ConnectionBin[] connectionBinArray;
-    /** The connection has resolved pointers to the ThrottleBin structures that help manage
-    * bandwidth throttling. */
-    protected ThrottleBin[] throttleBinArray;
-    /** These are the bandwidth limits, per bin */
-    protected double[] minMillisecondsPerByte;
-    /** Is the connection considered "active"? */
-    protected boolean isActive;
-    /** If not active, this is when it went inactive */
-    protected long inactiveTime = 0L;
-
+    /** Connection pool */
+    protected final ConnectionPool myPool;
+    /** Fetch throttler */
+    protected final IFetchThrottler fetchThrottler;
     /** Protocol */
-    protected String protocol;
+    protected final String protocol;
     /** Server */
-    protected String server;
+    protected final String server;
     /** Port */
-    protected int port;
+    protected final int port;
     /** Authentication */
-    protected PageCredentials authentication;
-    /** Trust store */
-    protected IKeystoreManager trustStore;
-    /** Trust store string */
-    protected String trustStoreString;
+    protected final PageCredentials authentication;
+
+    /** This is when the connection will expire.  Only valid if connection is in the pool. */
+    protected long expireTime = -1L;
 
     /** The http connection manager.  The pool is of size 1.  */
     protected PoolingClientConnectionManager connManager = null;
@@ -975,210 +266,66 @@ public class ThrottledFetcher
     /** The status code fetched, if any */
     protected int statusCode = FETCH_NOT_TRIED;
     /** The kind of fetch we are doing */
-    protected String fetchType = null;
-    /** The current bytes in the current fetch */
-    protected long fetchCounter = 0L;
-    /** The start of the current fetch */
-    protected long startFetchTime = -1L;
-    /** The cookies from the last fetch */
-    protected LoginCookies lastFetchCookies = null;
-    /** Proxy host */
-    protected final String proxyHost;
-    /** Proxy port */
-    protected final int proxyPort;
-    /** Proxy auth domain */
-    protected final String proxyAuthDomain;
-    /** Proxy auth user name */
-    protected final String proxyAuthUsername;
-    /** Proxy auth password */
-    protected final String proxyAuthPassword;
-    /** Https protocol */
-    protected final javax.net.ssl.SSLSocketFactory httpsSocketFactory;
-
-    /** The thread that is actually doing the work */
-    protected ExecuteMethodThread methodThread = null;
-    /** Set if thread has been started */
-    protected boolean threadStarted = false;
-    
-
-    /** Constructor.  Create a connection with a specific server and port, and
-    * register it as active against all bins. */
-    public ThrottledConnection(String protocol, String server, int port, PageCredentials authentication,
-      javax.net.ssl.SSLSocketFactory httpsSocketFactory, String trustStoreString, ConnectionBin[] connectionBins,
-      String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
-    {
-      this.proxyHost = proxyHost;
-      this.proxyPort = proxyPort;
-      this.proxyAuthDomain = proxyAuthDomain;
-      this.proxyAuthUsername = proxyAuthUsername;
-      this.proxyAuthPassword = proxyAuthPassword;
-      this.protocol = protocol;
-      this.server = server;
-      this.port = port;
-      this.authentication = authentication;
-      this.httpsSocketFactory = httpsSocketFactory;
-      this.trustStoreString = trustStoreString;
-      this.connectionBinArray = connectionBins;
-      this.throttleBinArray = new ThrottleBin[connectionBins.length];
-      this.minMillisecondsPerByte = new double[connectionBins.length];
-      this.isActive = true;
-      int i = 0;
-      while (i < connectionBins.length)
-      {
-        connectionBins[i].noteConnectionCreation();
-        // We don't keep throttle bin references around, since these are transient
-        throttleBinArray[i] = null;
-        minMillisecondsPerByte[i] = 0.0;
-        i++;
-      }
-
-
-    }
-
-    public void mustHaveReference(ConnectionBin cb)
-    {
-      int i = 0;
-      while (i < connectionBinArray.length)
-      {
-        if (cb == connectionBinArray[i])
-          return;
-        i++;
-      }
-      String msg = "Connection bin "+cb.toString()+" owns connection "+this.toString()+" for "+protocol+server+":"+port+
-        " but there is no back reference!";
-      Logging.connectors.error(msg);
-      System.out.println(msg);
-      new Exception(msg).printStackTrace();
-      System.exit(3);
-      //throw new RuntimeException(msg);
-    }
-
-    /** See if this instances matches a given server and port. */
-    public boolean matches(ConnectionBin[] bins, String protocol, String server, int port, PageCredentials authentication,
-      String trustStoreString, String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
-    {
-      if (this.trustStoreString == null || trustStoreString == null)
-      {
-        if (this.trustStoreString != trustStoreString)
-          return false;
-      }
-      else
-      {
-        if (!this.trustStoreString.equals(trustStoreString))
-          return false;
-      }
-
-      if (this.authentication == null || authentication == null)
-      {
-        if (this.authentication != authentication)
-          return false;
-      }
-      else
-      {
-        if (!this.authentication.equals(authentication))
-          return false;
-      }
-
-      if (this.proxyHost == null || proxyHost == null)
-      {
-        if (this.proxyHost != proxyHost)
-          return false;
-      }
-      else
-      {
-        if (!this.proxyHost.equals(proxyHost))
-          return false;
-        if (this.proxyAuthDomain == null || proxyAuthDomain == null)
-        {
-          if (this.proxyAuthDomain != proxyAuthDomain)
-            return false;
-        }
-        else
-        {
-          if (!this.proxyAuthDomain.equals(proxyAuthDomain))
-            return false;
-        }
-        if (this.proxyAuthUsername == null || proxyAuthUsername == null)
-        {
-          if (this.proxyAuthUsername != proxyAuthUsername)
-            return false;
-        }
-        else
-        {
-          if (!this.proxyAuthUsername.equals(proxyAuthUsername))
-            return false;
-        }
-        if (this.proxyAuthPassword == null || proxyAuthPassword == null)
-        {
-          if (this.proxyAuthPassword != proxyAuthPassword)
-            return false;
-        }
-        else
-        {
-          if (!this.proxyAuthPassword.equals(proxyAuthPassword))
-            return false;
-        }
-      }
-      
-      if (this.proxyPort != proxyPort)
-        return false;
-      
-      
-      if (this.connectionBinArray.length != bins.length || !this.protocol.equals(protocol) || !this.server.equals(server) || this.port != port)
-        return false;
-      
-      int i = 0;
-      while (i < bins.length)
-      {
-        if (connectionBinArray[i] != bins[i])
-          return false;
-        i++;
-      }
-      return true;
-    }
+    protected String fetchType = null;
+    /** The current bytes in the current fetch */
+    protected long fetchCounter = 0L;
+    /** The start of the current fetch */
+    protected long startFetchTime = -1L;
+    /** The cookies from the last fetch */
+    protected LoginCookies lastFetchCookies = null;
+    /** Proxy host */
+    protected final String proxyHost;
+    /** Proxy port */
+    protected final int proxyPort;
+    /** Proxy auth domain */
+    protected final String proxyAuthDomain;
+    /** Proxy auth user name */
+    protected final String proxyAuthUsername;
+    /** Proxy auth password */
+    protected final String proxyAuthPassword;
+    /** Https protocol */
+    protected final javax.net.ssl.SSLSocketFactory httpsSocketFactory;
 
-    /** Activate the connection. */
-    public void activate()
-    {
-      isActive = true;
-      int i = 0;
-      while (i < connectionBinArray.length)
-      {
-        connectionBinArray[i++].takeFromPool(this);
-      }
-    }
+    /** The thread that is actually doing the work */
+    protected ExecuteMethodThread methodThread = null;
+    /** Set if thread has been started */
+    protected boolean threadStarted = false;
+    
 
-    /** Set up the connection.  This allows us to feed all bins the correct bandwidth limit info.
-    */
-    public void setup(ThrottleDescription description)
+    /** Constructor.  Create a connection with a specific server and port, and
+    * register it as active against all bins. */
+    public ThrottledConnection(ConnectionPool myPool, IFetchThrottler fetchThrottler,
+      String protocol, String server, int port, PageCredentials authentication,
+      javax.net.ssl.SSLSocketFactory httpsSocketFactory,
+      String proxyHost, int proxyPort, String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
     {
-      // Go through all bins, and set up the current limits.
-      int i = 0;
-      while (i < connectionBinArray.length)
-      {
-        String binName = connectionBinArray[i].getBinName();
-        minMillisecondsPerByte[i] = description.getMinimumMillisecondsPerByte(binName);
-        i++;
-      }
+      this.myPool = myPool;
+      this.fetchThrottler = fetchThrottler;
+      this.proxyHost = proxyHost;
+      this.proxyPort = proxyPort;
+      this.proxyAuthDomain = proxyAuthDomain;
+      this.proxyAuthUsername = proxyAuthUsername;
+      this.proxyAuthPassword = proxyAuthPassword;
+      this.protocol = protocol;
+      this.server = server;
+      this.port = port;
+      this.authentication = authentication;
+      this.httpsSocketFactory = httpsSocketFactory;
     }
 
-    /** Do periodic bookkeeping.
-    *@return true if the connection is no longer valid, and can be removed. */
-    public boolean flushIdleConnections(long idleTimeout)
+    /** Check whether the connection has expired.
+    *@param currentTime is the current time to use to judge if a connection has expired.
+    *@return true if the connection has expired, and should be closed.
+    */
+    @Override
+    public boolean hasExpired(long currentTime)
     {
-      if (isActive)
-        return false;
-
       if (connManager != null)
       {
         connManager.closeIdleConnections(idleTimeout, TimeUnit.MILLISECONDS);
         connManager.closeExpiredConnections();
-        // Need to determine if there's a valid connection in the connection manager still, or if it is empty.
-        //return connManager.getConnectionsInPool() == 0;
-        return true;
       }
-      else
-        return true;
+      return (currentTime > expireTime);
     }
 
     /** Log the fetch of a number of bytes, from within a stream. */
@@ -1187,65 +334,10 @@ public class ThrottledFetcher
       fetchCounter += (long)count;
     }
 
-    /** Begin a read operation, from within a stream */
-    public void beginRead(int len)
-      throws InterruptedException
-    {
-      // Consult with throttle bins
-      int lastOneDone = 0;
-      try
-      {
-        for (int i = 0; i < throttleBinArray.length; i++)
-        {
-          throttleBinArray[i].beginRead(len,minMillisecondsPerByte[i]);
-          lastOneDone = i + 1;
-        }
-      }
-      finally
-      {
-        if (lastOneDone != throttleBinArray.length)
-        {
-          for (int i = 0; i < lastOneDone; i++)
-          {
-            throttleBinArray[i].abortRead();
-          }
-        }
-      }
-    }
-
-    /** End a read operation, from within a stream */
-    public void endRead(int origLen, int actualAmt)
-    {
-      // Consult with throttle bins
-      Throwable e = null;
-      for (int i = 0; i < throttleBinArray.length; i++)
-      {
-        try
-        {
-          throttleBinArray[i].endRead(origLen,actualAmt);
-        }
-        catch (Throwable e2)
-        {
-          e = e2;
-        }
-      }
-      if (e != null)
-      {
-        if (e instanceof RuntimeException)
-          throw (RuntimeException)e;
-        else if (e instanceof Error)
-          throw (Error)e;
-        else
-          throw new RuntimeException("Unknown exception: " + e.getMessage(),e);
-      }
-    }
-
     /** Destroy the connection forever */
-    protected void destroy()
+    @Override
+    public void destroy()
     {
-      if (isActive == false)
-        throw new RuntimeException("Trying to destroy an inactive connection");
-
       // Kill the actual connection object.
       if (connManager != null)
       {
@@ -1253,13 +345,6 @@ public class ThrottledFetcher
         connManager = null;
       }
 
-      // Call all the bins this belongs to, and decrement the in-use count.
-      int i = 0;
-      while (i < connectionBinArray.length)
-      {
-        ConnectionBin cb = connectionBinArray[i++];
-        cb.noteConnectionDestruction();
-      }
     }
 
 
@@ -1273,43 +358,15 @@ public class ThrottledFetcher
     {
       this.fetchType = fetchType;
       this.fetchCounter = 0L;
-      int lastCreated = 0;
       try
       {
-        // Find or create the needed throttle bins
-        for (int i = 0; i < throttleBinArray.length; i++)
-        {
-          // Access the bins as we need them, and drop them when ref count goes to zero
-          String binName = connectionBinArray[i].getBinName();
-          ThrottleBin tb;
-          synchronized (throttleBins)
-          {
-            tb = throttleBins.get(binName);
-            if (tb == null)
-            {
-              tb = new ThrottleBin(binName);
-              throttleBins.put(binName,tb);
-            }
-            tb.beginFetch();
-          }
-          throttleBinArray[i] = tb;
-          lastCreated = i + 1;
-        }
+        if (fetchThrottler.obtainFetchDocumentPermission() == false)
+          throw new IllegalStateException("Unexpected return value from obtainFetchDocumentPermission()");
       }
       catch (InterruptedException e)
       {
         throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
       }
-      finally
-      {
-        if (lastCreated != throttleBinArray.length)
-        {
-          for (int i = 0; i < lastCreated; i++)
-          {
-            throttleBinArray[i].abortFetch();
-          }
-        }
-      }
     }
 
     /** Execute the fetch and get the return code.  This method uses the
@@ -1600,7 +657,7 @@ public class ThrottledFetcher
       //httpClient.setCookieStore(cookieStore);
       
       // Create the thread
-      methodThread = new ExecuteMethodThread(this, httpClient, fetchMethod, cookieStore);
+      methodThread = new ExecuteMethodThread(this, fetchThrottler, httpClient, fetchMethod, cookieStore);
       try
       {
         methodThread.start();
@@ -1893,17 +950,6 @@ public class ThrottledFetcher
           methodThread.abort();
 
         long endTime = System.currentTimeMillis();
-        int i = 0;
-        while (i < throttleBinArray.length)
-        {
-          synchronized (throttleBins)
-          {
-            if (throttleBinArray[i].endFetch())
-              throttleBins.remove(throttleBinArray[i].getBinName());
-          }
-          throttleBinArray[i] = null;
-          i++;
-        }
 
         activities.recordActivity(new Long(startFetchTime),WebcrawlerConnector.ACTIVITY_FETCH,
           new Long(fetchCounter),myUrl,Integer.toString(statusCode),(throwable==null)?null:throwable.getMessage(),null);
@@ -1945,44 +991,13 @@ public class ThrottledFetcher
 
     }
 
-    /** Close the connection.  Call this to end this server connection.
+    /** Close the connection.  Call this to return the connection to its pool.
     */
     @Override
     public void close()
-      throws ManifoldCFException
     {
-      synchronized (poolLock)
-      {
-        // Verify that all the connections that exist are in fact sane
-        synchronized (connectionBins)
-        {
-          for (String connectionName : connectionBins.keySet())
-          {
-            ConnectionBin cb = connectionBins.get(connectionName);
-            //cb.sanityCheck();
-          }
-        }
-
-        // Leave the connection alive, but mark it as inactive, and return it to the appropriate pools.
-        isActive = false;
-        inactiveTime = System.currentTimeMillis();
-        int i = 0;
-        while (i < connectionBinArray.length)
-        {
-          connectionBinArray[i++].addToPool(this);
-        }
-        // Verify that all the connections that exist are in fact sane
-        synchronized (connectionBins)
-        {
-          for (String connectionName : connectionBins.keySet())
-          {
-            ConnectionBin cb = connectionBins.get(connectionName);
-            //cb.sanityCheck();
-          }
-        }
-        // Wake up everything waiting on the pool lock
-        poolLock.notifyAll();
-      }
+      expireTime = System.currentTimeMillis() + idleTimeout;
+      myPool.release(this);
     }
     
     protected void handleHTTPException(HttpException e, String activity)
@@ -2048,17 +1063,18 @@ public class ThrottledFetcher
   */
   protected static class ThrottledInputstream extends InputStream
   {
-    /** Stream throttling parameters */
-    protected double minimumMillisecondsPerBytePerServer;
+    /** Stream throttler */
+    protected final IStreamThrottler streamThrottler;
     /** The throttled connection we belong to */
-    protected ThrottledConnection throttledConnection;
+    protected final ThrottledConnection throttledConnection;
     /** The stream we are wrapping. */
-    protected InputStream inputStream;
+    protected final InputStream inputStream;
 
     /** Constructor.
     */
-    public ThrottledInputstream(ThrottledConnection connection, InputStream is)
+    public ThrottledInputstream(IStreamThrottler streamThrottler, ThrottledConnection connection, InputStream is)
     {
+      this.streamThrottler = streamThrottler;
       this.throttledConnection = connection;
       this.inputStream = is;
     }
@@ -2126,7 +1142,8 @@ public class ThrottledFetcher
     {
       try
       {
-        throttledConnection.beginRead(len);
+        if (streamThrottler.obtainReadPermission(len) == false)
+          throw new IllegalStateException("Unexpected result calling obtainReadPermission()");
         int amt = 0;
         try
         {
@@ -2136,10 +1153,10 @@ public class ThrottledFetcher
         finally
         {
           if (amt == -1)
-            throttledConnection.endRead(len,0);
+            streamThrottler.releaseReadPermission(len,0);
           else
           {
-            throttledConnection.endRead(len,amt);
+            streamThrottler.releaseReadPermission(len,amt);
             throttledConnection.logFetchCount(amt);
           }
         }
@@ -2226,6 +1243,10 @@ public class ThrottledFetcher
       {
         Logging.connectors.debug("IO Exception trying to close connection: "+e.getMessage(),e);
       }
+      finally
+      {
+        streamThrottler.closeStream();
+      }
     }
 
   }
@@ -2298,6 +1319,8 @@ public class ThrottledFetcher
   {
     /** The connection */
     protected final ThrottledConnection theConnection;
+    /** The fetch throttler */
+    protected final IFetchThrottler fetchThrottler;
     /** Client and method, all preconfigured */
     protected final AbstractHttpClient httpClient;
     protected final HttpRequestBase executeMethod;
@@ -2317,12 +1340,13 @@ public class ThrottledFetcher
 
     protected Throwable generalException = null;
     
-    public ExecuteMethodThread(ThrottledConnection theConnection,
+    public ExecuteMethodThread(ThrottledConnection theConnection, IFetchThrottler fetchThrottler,
       AbstractHttpClient httpClient, HttpRequestBase executeMethod, CookieStore cookieStore)
     {
       super();
       setDaemon(true);
       this.theConnection = theConnection;
+      this.fetchThrottler = fetchThrottler;
       this.httpClient = httpClient;
       this.executeMethod = executeMethod;
       this.cookieStore = cookieStore;
@@ -2419,7 +1443,7 @@ public class ThrottledFetcher
                   bodyStream = response.getEntity().getContent();
                   if (bodyStream != null)
                   {
-                    bodyStream = new ThrottledInputstream(theConnection,bodyStream);
+                    bodyStream = new ThrottledInputstream(fetchThrottler.createFetchStream(),theConnection,bodyStream);
                     if (gzip)
                       bodyStream = new GZIPInputStream(bodyStream);
                     else if (deflate)
@@ -2741,4 +1765,252 @@ public class ThrottledFetcher
 
   }
 
+  /** Connection pool key */
+  protected static class ConnectionPoolKey
+  {
+    protected final String protocol;
+    protected final String server;
+    protected final int port;
+    protected final PageCredentials authentication;
+    protected final String trustStoreString;
+    protected final String proxyHost;
+    protected final int proxyPort;
+    protected final String proxyAuthDomain;
+    protected final String proxyAuthUsername;
+    protected final String proxyAuthPassword;
+    
+    public ConnectionPoolKey(String protocol,
+      String server, int port, PageCredentials authentication,
+      String trustStoreString, String proxyHost, int proxyPort,
+      String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
+    {
+      this.protocol = protocol;
+      this.server = server;
+      this.port = port;
+      this.authentication = authentication;
+      this.trustStoreString = trustStoreString;
+      this.proxyHost = proxyHost;
+      this.proxyPort = proxyPort;
+      this.proxyAuthDomain = proxyAuthDomain;
+      this.proxyAuthUsername = proxyAuthUsername;
+      this.proxyAuthPassword = proxyAuthPassword;
+    }
+    
+    public int hashCode()
+    {
+      return protocol.hashCode() +
+        server.hashCode() +
+        (port * 31) +
+        ((authentication==null)?0:authentication.hashCode()) +
+        ((trustStoreString==null)?0:trustStoreString.hashCode()) +
+        ((proxyHost==null)?0:proxyHost.hashCode()) +
+        (proxyPort * 29) +
+        ((proxyAuthDomain==null)?0:proxyAuthDomain.hashCode()) +
+        ((proxyAuthUsername==null)?0:proxyAuthUsername.hashCode()) +
+        ((proxyAuthPassword==null)?0:proxyAuthPassword.hashCode());
+    }
+    
+    public boolean equals(Object o)
+    {
+      if (!(o instanceof ConnectionPoolKey))
+        return false;
+      ConnectionPoolKey other = (ConnectionPoolKey)o;
+      if (!server.equals(other.server) ||
+        port != other.port)
+        return false;
+      if (authentication == null || other.authentication == null)
+      {
+        if (authentication != other.authentication)
+          return false;
+      }
+      else
+      {
+        if (!authentication.equals(other.authentication))
+          return false;
+      }
+      if (trustStoreString == null || other.trustStoreString == null)
+      {
+        if (trustStoreString != other.trustStoreString)
+          return false;
+      }
+      else
+      {
+        if (!trustStoreString.equals(other.trustStoreString))
+          return false;
+      }
+      if (proxyHost == null || other.proxyHost == null)
+      {
+        if (proxyHost != other.proxyHost)
+          return false;
+      }
+      else
+      {
+        if (!proxyHost.equals(other.proxyHost))
+          return false;
+      }
+      if (proxyPort != other.proxyPort)
+        return false;
+      if (proxyAuthDomain == null || other.proxyAuthDomain == null)
+      {
+        if (proxyAuthDomain != other.proxyAuthDomain)
+          return false;
+      }
+      else
+      {
+        if (!proxyAuthDomain.equals(other.proxyAuthDomain))
+          return false;
+      }
+      if (proxyAuthUsername == null || other.proxyAuthUsername == null)
+      {
+        if (proxyAuthUsername != other.proxyAuthUsername)
+          return false;
+      }
+      else
+      {
+        if (!proxyAuthUsername.equals(other.proxyAuthUsername))
+          return false;
+      }
+      if (proxyAuthPassword == null || other.proxyAuthPassword == null)
+      {
+        if (proxyAuthPassword != other.proxyAuthPassword)
+          return false;
+      }
+      else
+      {
+        if (!proxyAuthPassword.equals(other.proxyAuthPassword))
+          return false;
+      }
+      return true;
+    }
+  }
+  
+  /** Each connection pool has identical connections we can draw on.
+  */
+  protected static class ConnectionPool
+  {
+    /** Throttler */
+    protected final IConnectionThrottler connectionThrottler;
+    
+    // If we need to create a connection, these are what we use
+    
+    protected final String protocol;
+    protected final String server;
+    protected final int port;
+    protected final PageCredentials authentication;
+    protected final javax.net.ssl.SSLSocketFactory baseFactory;
+    protected final String proxyHost;
+    protected final int proxyPort;
+    protected final String proxyAuthDomain;
+    protected final String proxyAuthUsername;
+    protected final String proxyAuthPassword;
+
+    /** The actual pool of connections */
+    protected final List<IThrottledConnection> connections = new ArrayList<IThrottledConnection>();
+    
+    public ConnectionPool(IConnectionThrottler connectionThrottler,
+      String protocol,
+      String server, int port, PageCredentials authentication,
+      javax.net.ssl.SSLSocketFactory baseFactory,
+      String proxyHost, int proxyPort,
+      String proxyAuthDomain, String proxyAuthUsername, String proxyAuthPassword)
+    {
+      this.connectionThrottler = connectionThrottler;
+      
+      this.protocol = protocol;
+      this.server = server;
+      this.port = port;
+      this.authentication = authentication;
+      this.baseFactory = baseFactory;
+      this.proxyHost = proxyHost;
+      this.proxyPort = proxyPort;
+      this.proxyAuthDomain = proxyAuthDomain;
+      this.proxyAuthUsername = proxyAuthUsername;
+      this.proxyAuthPassword = proxyAuthPassword;
+    }
+    
+    public IThrottledConnection grab()
+      throws InterruptedException
+    {
+      // Wait for a connection
+      int result = connectionThrottler.waitConnectionAvailable();
+      if (result == IConnectionThrottler.CONNECTION_FROM_POOL)
+      {
+        // We are guaranteed to have a connection in the pool, unless there's a coding error.
+        synchronized (connections)
+        {
+          return connections.remove(connections.size()-1);
+        }
+      }
+      else if (result == IConnectionThrottler.CONNECTION_FROM_CREATION)
+      {
+        return new ThrottledConnection(this,connectionThrottler.getNewConnectionFetchThrottler(),
+          protocol,server,port,authentication,baseFactory,
+          proxyHost,proxyPort,
+          proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
+      }
+      else
+        throw new IllegalStateException("Unexpected return value from waitConnectionAvailable(): "+result);
+    }
+    
+    public void release(IThrottledConnection connection)
+    {
+      if (connectionThrottler.noteReturnedConnection())
+      {
+        // Destroy this connection
+        connection.destroy();
+        connectionThrottler.noteConnectionDestroyed();
+      }
+      else
+      {
+        // Return to pool
+        synchronized (connections)
+        {
+          connections.add(connection);
+        }
+        connectionThrottler.noteConnectionReturnedToPool();
+      }
+    }
+    
+    public void flushIdleConnections()
+    {
+      long currentTime = System.currentTimeMillis();
+      // First, remove connections that are over the quota
+      while (connectionThrottler.checkDestroyPooledConnection())
+      {
+        // Destroy the oldest ones first
+        IThrottledConnection connection;
+        synchronized (connections)
+        {
+          connection = connections.remove(0);
+        }
+        connection.close();
+        connectionThrottler.noteConnectionDestroyed();
+      }
+      // Now, get rid of expired connections
+      while (true)
+      {
+        boolean expired;
+        synchronized (connections)
+        {
+          expired = connections.size() > 0 && connections.get(0).hasExpired(currentTime);
+        }
+        if (!expired)
+          break;
+        // We found an expired connection!  Now tell the throttler that, and see if it agrees.
+        if (connectionThrottler.checkExpireConnection())
+        {
+          // Remove a connection from the pool, and destroy it.
+          // It's not guaranteed to be an expired one, but that's a rare occurrence, we expect.
+          IThrottledConnection connection;
+          synchronized (connections)
+          {
+            connection = connections.remove(0);
+          }
+          connection.destroy();
+          connectionThrottler.noteConnectionDestroyed();
+        }
+      }
+    }
+    
+  }
 }

Modified: manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java (original)
+++ manifoldcf/trunk/connectors/webcrawler/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/webcrawler/WebcrawlerConnector.java Tue Dec 17 21:12:42 2013
@@ -160,6 +160,8 @@ public class WebcrawlerConnector extends
   protected int connectionTimeoutMilliseconds = 60000;
   /** Socket timeout, milliseconds */
   protected int socketTimeoutMilliseconds = 300000;
+  /** Throttle group name */
+  protected String throttleGroupName = null;
 
   // Canonicalization enabling/disabling.  Eventually this will probably need to be by regular expression.
 
@@ -354,6 +356,9 @@ public class WebcrawlerConnector extends
     {
       String x;
 
+      // Either set this from the connection name, or just have one.  Right now, we have one.
+      String throttleGroupName = "";
+      
       String emailAddress = params.getParameter(WebcrawlerConfig.PARAMETER_EMAIL);
       if (emailAddress == null)
         throw new ManifoldCFException("Missing email address");
@@ -406,7 +411,7 @@ public class WebcrawlerConnector extends
   public void poll()
     throws ManifoldCFException
   {
-    ThrottledFetcher.flushIdleConnections();
+    ThrottledFetcher.flushIdleConnections(currentContext);
   }
 
   /** Check status of connection.
@@ -425,6 +430,7 @@ public class WebcrawlerConnector extends
   public void disconnect()
     throws ManifoldCFException
   {
+    throttleGroupName = null;
     throttleDescription = null;
     credentialsDescription = null;
     trustsDescription = null;
@@ -711,7 +717,9 @@ public class WebcrawlerConnector extends
 
                   // Prepare to perform the fetch, and decide what to do with the document.
                   //
-                  IThrottledConnection connection = ThrottledFetcher.getConnection(protocol,ipAddress,port,
+                  IThrottledConnection connection = ThrottledFetcher.getConnection(currentContext,
+                    throttleGroupName,
+                    protocol,ipAddress,port,
                     credential,trustStore,throttleDescription,binNames,connectionLimit,
                     proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
                   try
@@ -5126,7 +5134,8 @@ public class WebcrawlerConnector extends
       // We've successfully obtained a lock on reading robots for this server!  Now, guarantee that we'll free it, by instantiating a try/finally
       try
       {
-        IThrottledConnection connection = ThrottledFetcher.getConnection(protocol,hostIPAddress,port,credential,
+        IThrottledConnection connection = ThrottledFetcher.getConnection(currentContext,throttleGroupName,
+          protocol,hostIPAddress,port,credential,
           trustStore,throttleDescription,binNames,connectionLimit,
           proxyHost,proxyPort,proxyAuthDomain,proxyAuthUsername,proxyAuthPassword);
         try

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/IdleCleanupThread.java Tue Dec 17 21:12:42 2013
@@ -55,6 +55,8 @@ public class IdleCleanupThread extends T
       ICacheManager cacheManager = CacheManagerFactory.make(threadContext);
       // Get the output connector pool handle
       IOutputConnectorPool outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
+      // Throttler subsystem
+      IThrottleGroups throttleGroups = ThrottleGroupsFactory.make(threadContext);
       
       /* For HSQLDB debugging...
       IDBInterface database = DBInterfaceFactory.make(threadContext,
@@ -88,6 +90,9 @@ public class IdleCleanupThread extends T
           
           // Do the cleanup
           outputConnectorPool.pollAllConnectors();
+          // Poll connection bins
+          throttleGroups.poll();
+          // Expire objects
           cacheManager.expireObjects(System.currentTimeMillis());
           
           // Sleep for the retry interval.

Modified: manifoldcf/trunk/framework/build.xml
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/build.xml?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/framework/build.xml (original)
+++ manifoldcf/trunk/framework/build.xml Tue Dec 17 21:12:42 2013
@@ -1482,6 +1482,7 @@
             <test name="org.apache.manifoldcf.core.common.DateTest" todir="test-output"/>
             <test name="org.apache.manifoldcf.core.fuzzyml.TestFuzzyML" todir="test-output"/>
             <test name="org.apache.manifoldcf.core.lockmanager.TestZooKeeperLocks" todir="test-output"/>
+            <test name="org.apache.manifoldcf.core.throttler.TestThrottler" todir="test-output"/>
 
         </junit>
     </target>

Modified: manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java?rev=1551707&r1=1551706&r2=1551707&view=diff
==============================================================================
--- manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java (original)
+++ manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/system/ManifoldCF.java Tue Dec 17 21:12:42 2013
@@ -253,6 +253,8 @@ public class ManifoldCF
           masterDatabaseUsername = LockManagerFactory.getStringProperty(threadContext,masterDatabaseUsernameProperty,"manifoldcf");
           masterDatabasePassword = LockManagerFactory.getStringProperty(threadContext,masterDatabasePasswordProperty,"local_pg_passwd");
 
+          // Register the throttler for cleanup on shutdown
+          addShutdownHook(new ThrottlerShutdown());
           // Register the file tracker for cleanup on shutdown
           tracker = new FileTrack();
           addShutdownHook(tracker);
@@ -1383,6 +1385,38 @@ public class ManifoldCF
 
   }
 
+  /** Class that cleans up throttler on exit */
+  protected static class ThrottlerShutdown implements IShutdownHook
+  {
+    public ThrottlerShutdown()
+    {
+    }
+    
+    @Override
+    public void doCleanup(IThreadContext threadContext)
+      throws ManifoldCFException
+    {
+      IThrottleGroups connectionThrottler = ThrottleGroupsFactory.make(threadContext);
+      connectionThrottler.destroy();
+    }
+    
+    /** Finalizer, which is designed to catch class unloading that tomcat 5.5 does.
+    */
+    protected void finalize()
+      throws Throwable
+    {
+      try
+      {
+        doCleanup(ThreadContextFactory.make());
+      }
+      finally
+      {
+        super.finalize();
+      }
+    }
+
+  }
+  
   /** Class that cleans up database handles on exit */
   protected static class DatabaseShutdown implements IShutdownHook
   {