You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/13 02:30:11 UTC

svn commit: r1550611 - in /manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler: ConnectionBin.java FetchBin.java

Author: kwright
Date: Fri Dec 13 01:30:10 2013
New Revision: 1550611

URL: http://svn.apache.org/r1550611
Log:
Create local bins for connection allocation, fetch rate waiting, and byte rate waiting.

Added:
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java   (with props)
Modified:
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java?rev=1550611&r1=1550610&r2=1550611&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java Fri Dec 13 01:30:10 2013
@@ -28,20 +28,29 @@ import org.apache.manifoldcf.core.system
 * connections, and pooling established connections, is functionality that must reside in the
 * caller.
 *
+* The 'connections' each connection bin tracks are connections outstanding that share this bin name.
+* Not all such connections are identical; some may in fact have entirely different sets of
+* bins associated with them, but they all have the specific bin in common.  Since each bin has its
+* own unique limit, this effectively means that in order to get a connection, you need to find an
+* available slot in ALL of its constituent connection bins.  If the connections are pooled, it makes
+* the most sense to divide the pool up by characteristics such that identical connections are all
+* handled together - and it is reasonable to presume that an identical connection has identical
+* connection bins.
+*
 * NOTE WELL: This is entirely local in operation
 */
 public class ConnectionBin
 {
   /** This is the bin name which this connection pool belongs to */
   protected final String binName;
-  /** This is the number of connections in this bin that are signed out and presumably in use */
+  /** This is the maximum number of active connections allowed for this bin */
+  protected int maxActiveConnections = 0;
+  /** This is the number of connections in this bin that have been reserved - that is, they
+  * are promised to various callers, but those callers have not yet committed to obtaining them. */
+  protected int reservedConnections = 0;
+  /** This is the number of connections in this bin that are connected; immaterial whether they are
+  * in use or in a pool somewhere. */
   protected int inUseConnections = 0;
-  /** This is the last time a fetch was done on this bin */
-  protected long lastFetchTime = 0L;
-  /** This object is what we synchronize on when we are waiting on a connection to free up for this
-  * bin.  This is a separate object, because we also want to protect the integrity of the
-  * ConnectionBin object itself, for which we'll use the ConnectionBin's synchronizer. */
-  protected final Integer connectionWait = new Integer(0);
 
   /** Constructor. */
   public ConnectionBin(String binName)
@@ -55,39 +64,67 @@ public class ConnectionBin
     return binName;
   }
 
-  /** Note the creation of an active connection that belongs to this bin.  The slots all must
-  * have been reserved prior to the connection being created.
+  /** Update the maximum number of active connections.
   */
-  public synchronized void noteConnectionCreation()
+  public synchronized void updateMaxActiveConnections(int maxActiveConnections)
   {
-    inUseConnections++;
+    // Update the number and wake up any waiting threads; they will take care of everything.
+    this.maxActiveConnections = maxActiveConnections;
+    notifyAll();
+  }
+
+  /** Reserve a connection from this bin.  If there is no connection yet available to reserve, wait
+  * until there is. */
+  public synchronized void reserveAConnection()
+    throws InterruptedException
+  {
+    // Reserved connections keep a slot available which can't be used by anyone else.
+    // Connection bins are always sorted so that deadlocks can't occur.
+    // Once all slots are reserved, the caller will go ahead and create the necessary connection
+    // and convert the reservation to a new connection.
+    while (true)
+    {
+      if (inUseConnections + reservedConnections < maxActiveConnections)
+      {
+        reservedConnections++;
+        return;
+      }
+      // Wait for a connection to free up.  Note that it is up to the caller to free stuff up.
+      wait();
+    }
   }
-
-  /** Note the destruction of an active connection that belongs to this bin.
+  
+  /** Clear reservation.
   */
-  public synchronized void noteConnectionDestruction()
+  public synchronized void clearReservation()
   {
-    inUseConnections--;
+    if (reservedConnections == 0)
+      throw new IllegalStateException("Can't clear a reservation we don't have");
+    reservedConnections--;
+    notifyAll();
   }
-
-  /** Note a new time for connection fetch for this pool.
-  *@param currentTime is the time the fetch was started.
+  
+  /** Note the creation of an active connection that belongs to this bin.  The connection MUST
+  * have been reserved prior to the connection being created.
   */
-  public synchronized void setLastFetchTime(long currentTime)
+  public synchronized void noteConnectionCreation()
   {
-    if (currentTime > lastFetchTime)
-      lastFetchTime = currentTime;
+    if (reservedConnections == 0)
+      throw new IllegalStateException("Creating a connection when no connection slot reserved!");
+    reservedConnections--;
+    inUseConnections++;
+    // No notification needed because the total number of reserved+active connections did not change.
   }
 
-  /** Get the last fetch time.
-  *@return the time.
+  /** Note the destruction of an active connection that belongs to this bin.
   */
-  public synchronized long getLastFetchTime()
+  public synchronized void noteConnectionDestruction()
   {
-    return lastFetchTime;
+    inUseConnections--;
+    notifyAll();
   }
 
-  /** Count connections that are in use.
+  /** Count connections that are active.
   *@return connections that are in use.
   */
   public synchronized int countConnections()

Added: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1550611&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (added)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Fri Dec 13 01:30:10 2013
@@ -0,0 +1,129 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+
+/** Connection tracking for a bin.
+*
+* This class keeps track of information needed to figure out fetch rate throttling for connections,
+* on a bin-by-bin basis. 
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class FetchBin
+{
+  /** This is the bin name which this connection pool belongs to */
+  protected final String binName;
+  /** This is the last time a fetch was done on this bin */
+  protected long lastFetchTime = 0L;
+  /** This is the minimum time between fetches for this bin, in ms. */
+  protected long minTimeBetweenFetches = Long.MAX_VALUE;
+  /** Is the next fetch reserved? */
+  protected boolean reserveNextFetch = false;
+
+  /** Constructor. */
+  public FetchBin(String binName)
+  {
+    this.binName = binName;
+  }
+
+  /** Get the bin name. */
+  public String getBinName()
+  {
+    return binName;
+  }
+
+  /** Update the maximum number of active connections.
+  */
+  public synchronized void updateMinTimeBetweenFetches(long minTimeBetweenFetches)
+  {
+    // Update the number and wake up any waiting threads; they will take care of everything.
+    this.minTimeBetweenFetches = minTimeBetweenFetches;
+    // Wake up everything that's waiting.
+    notifyAll();
+  }
+
+  /** Reserve a request to fetch a document from this bin.  The actual fetch is not yet committed
+  * with this call, but if it succeeds for all bins associated with the document, then the caller
+  * has permission to do the fetch, and can update the last fetch time.
+  */
+  public synchronized void reserveFetchRequest()
+    throws InterruptedException
+  {
+    // First wait for the ability to even get the next fetch from this bin
+    while (true)
+    {
+      if (!reserveNextFetch)
+        break;
+      wait();
+    }
+    reserveNextFetch = true;
+  }
+  
+  /** Clear reserved request.
+  */
+  public synchronized void clearReservation()
+  {
+    if (!reserveNextFetch)
+      throw new IllegalStateException("Can't clear a fetch reservation we don't have");
+    reserveNextFetch = false;
+  }
+  
+  /** Wait the necessary time to do the fetch.  Presumes we've reserved the next fetch
+  * rights already, via reserveFetchRequest().
+  */
+  public synchronized void waitNextFetch()
+    throws InterruptedException
+  {
+    if (!reserveNextFetch)
+      throw new IllegalStateException("No fetch request reserved!");
+    
+    while (true)
+    {
+      if (minTimeBetweenFetches == Long.MAX_VALUE)
+      {
+        // wait forever - but eventually someone will set a smaller interval and wake us up.
+        wait();
+      }
+      else
+      {
+        long currentTime = System.currentTimeMillis();
+        // Compute how long we have to wait, based on the current time and the time of the last fetch.
+        long waitAmt = lastFetchTime + minTimeBetweenFetches - currentTime;
+        if (waitAmt <= 0L)
+          break;
+        wait(waitAmt);
+      }
+    }
+  }
+  
+  /** Note the beginning of fetch of a document from this bin.
+  *@param currentTime is the actual time the fetch was started.
+  */
+  public synchronized void beginFetch(long currentTime)
+  {
+    if (currentTime > lastFetchTime)
+      lastFetchTime = currentTime;
+    reserveNextFetch = false;
+  }
+
+}
+

Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
------------------------------------------------------------------------------
    svn:keywords = Id