You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/13 02:30:11 UTC
svn commit: r1550611 - in
/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler:
ConnectionBin.java FetchBin.java
Author: kwright
Date: Fri Dec 13 01:30:10 2013
New Revision: 1550611
URL: http://svn.apache.org/r1550611
Log:
Create local bins for connection allocation, fetch rate waiting, and byte rate waiting.
Added:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (with props)
Modified:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java?rev=1550611&r1=1550610&r2=1550611&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java Fri Dec 13 01:30:10 2013
@@ -28,20 +28,29 @@ import org.apache.manifoldcf.core.system
* connections, and pooling established connections, is functionality that must reside in the
* caller.
*
+* The 'connections' each connection bin tracks are connections outstanding that share this bin name.
+* Not all such connections are identical; some may in fact have entirely different sets of
+* bins associated with them, but they all have the specific bin in common. Since each bin has its
+* own unique limit, this effectively means that in order to get a connection, you need to find an
+* available slot in ALL of its constituent connection bins. If the connections are pooled, it makes
+* the most sense to divide the pool up by characteristics such that identical connections are all
+* handled together - and it is reasonable to presume that an identical connection has identical
+* connection bins.
+*
* NOTE WELL: This is entirely local in operation
*/
public class ConnectionBin
{
/** This is the bin name which this connection pool belongs to */
protected final String binName;
- /** This is the number of connections in this bin that are signed out and presumably in use */
+ /** This is the maximum number of active connections allowed for this bin */
+ protected int maxActiveConnections = 0;
+ /** This is the number of connections in this bin that have been reserved - that is, they
+ * are promised to various callers, but those callers have not yet committed to obtaining them. */
+ protected int reservedConnections = 0;
+ /** This is the number of connections in this bin that are connected; immaterial whether they are
+ * in use or in a pool somewhere. */
protected int inUseConnections = 0;
- /** This is the last time a fetch was done on this bin */
- protected long lastFetchTime = 0L;
- /** This object is what we synchronize on when we are waiting on a connection to free up for this
- * bin. This is a separate object, because we also want to protect the integrity of the
- * ConnectionBin object itself, for which we'll use the ConnectionBin's synchronizer. */
- protected final Integer connectionWait = new Integer(0);
/** Constructor. */
public ConnectionBin(String binName)
@@ -55,39 +64,67 @@ public class ConnectionBin
return binName;
}
- /** Note the creation of an active connection that belongs to this bin. The slots all must
- * have been reserved prior to the connection being created.
+ /** Update the maximum number of active connections.
*/
- public synchronized void noteConnectionCreation()
+ public synchronized void updateMaxActiveConnections(int maxActiveConnections)
{
- inUseConnections++;
+ // Update the number and wake up any waiting threads; they will take care of everything.
+ this.maxActiveConnections = maxActiveConnections;
+ notifyAll();
+ }
+
+ /** Reserve a connection from this bin. If there is no connection yet available to reserve, wait
+ * until there is. */
+ public synchronized void reserveAConnection()
+ throws InterruptedException
+ {
+ // Reserved connections keep a slot available which can't be used by anyone else.
+ // Connection bins are always sorted so that deadlocks can't occur.
+ // Once all slots are reserved, the caller will go ahead and create the necessary connection
+ // and convert the reservation to a new connection.
+ while (true)
+ {
+ if (inUseConnections + reservedConnections < maxActiveConnections)
+ {
+ reservedConnections++;
+ return;
+ }
+ // Wait for a connection to free up. Note that it is up to the caller to free stuff up.
+ wait();
+ }
}
-
- /** Note the destruction of an active connection that belongs to this bin.
+
+ /** Clear reservation.
*/
- public synchronized void noteConnectionDestruction()
+ public synchronized void clearReservation()
{
- inUseConnections--;
+ if (reservedConnections == 0)
+ throw new IllegalStateException("Can't clear a reservation we don't have");
+ reservedConnections--;
+ notifyAll();
}
-
- /** Note a new time for connection fetch for this pool.
- *@param currentTime is the time the fetch was started.
+
+ /** Note the creation of an active connection that belongs to this bin. The connection MUST
+ * have been reserved prior to the connection being created.
*/
- public synchronized void setLastFetchTime(long currentTime)
+ public synchronized void noteConnectionCreation()
{
- if (currentTime > lastFetchTime)
- lastFetchTime = currentTime;
+ if (reservedConnections == 0)
+ throw new IllegalStateException("Creating a connection when no connection slot reserved!");
+ reservedConnections--;
+ inUseConnections++;
+ // No notification needed because the total number of reserved+active connections did not change.
}
- /** Get the last fetch time.
- *@return the time.
+ /** Note the destruction of an active connection that belongs to this bin.
*/
- public synchronized long getLastFetchTime()
+ public synchronized void noteConnectionDestruction()
{
- return lastFetchTime;
+ inUseConnections--;
+ notifyAll();
}
- /** Count connections that are in use.
+ /** Count connections that are active.
*@return connections that are in use.
*/
public synchronized int countConnections()
Added: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1550611&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (added)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Fri Dec 13 01:30:10 2013
@@ -0,0 +1,129 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+
+/** Connection tracking for a bin.
+*
+* This class keeps track of information needed to figure out fetch rate throttling for connections,
+* on a bin-by-bin basis.
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class FetchBin
+{
+ /** This is the bin name which this connection pool belongs to */
+ protected final String binName;
+ /** This is the last time a fetch was done on this bin */
+ protected long lastFetchTime = 0L;
+ /** This is the minimum time between fetches for this bin, in ms. */
+ protected long minTimeBetweenFetches = Long.MAX_VALUE;
+ /** Is the next fetch reserved? */
+ protected boolean reserveNextFetch = false;
+
+ /** Constructor. */
+ public FetchBin(String binName)
+ {
+ this.binName = binName;
+ }
+
+ /** Get the bin name. */
+ public String getBinName()
+ {
+ return binName;
+ }
+
+ /** Update the maximum number of active connections.
+ */
+ public synchronized void updateMinTimeBetweenFetches(long minTimeBetweenFetches)
+ {
+ // Update the number and wake up any waiting threads; they will take care of everything.
+ this.minTimeBetweenFetches = minTimeBetweenFetches;
+ // Wake up everything that's waiting.
+ notifyAll();
+ }
+
+ /** Reserve a request to fetch a document from this bin. The actual fetch is not yet committed
+ * with this call, but if it succeeds for all bins associated with the document, then the caller
+ * has permission to do the fetch, and can update the last fetch time.
+ */
+ public synchronized void reserveFetchRequest()
+ throws InterruptedException
+ {
+ // First wait for the ability to even get the next fetch from this bin
+ while (true)
+ {
+ if (!reserveNextFetch)
+ break;
+ wait();
+ }
+ reserveNextFetch = true;
+ }
+
+ /** Clear reserved request.
+ */
+ public synchronized void clearReservation()
+ {
+ if (!reserveNextFetch)
+ throw new IllegalStateException("Can't clear a fetch reservation we don't have");
+ reserveNextFetch = false;
+ }
+
+ /** Wait the necessary time to do the fetch. Presumes we've reserved the next fetch
+ * rights already, via reserveFetchRequest().
+ */
+ public synchronized void waitNextFetch()
+ throws InterruptedException
+ {
+ if (!reserveNextFetch)
+ throw new IllegalStateException("No fetch request reserved!");
+
+ while (true)
+ {
+ if (minTimeBetweenFetches == Long.MAX_VALUE)
+ {
+ // wait forever - but eventually someone will set a smaller interval and wake us up.
+ wait();
+ }
+ else
+ {
+ long currentTime = System.currentTimeMillis();
+ // Compute how long we have to wait, based on the current time and the time of the last fetch.
+ long waitAmt = lastFetchTime + minTimeBetweenFetches - currentTime;
+ if (waitAmt <= 0L)
+ break;
+ wait(waitAmt);
+ }
+ }
+ }
+
+ /** Note the beginning of fetch of a document from this bin.
+ *@param currentTime is the actual time the fetch was started.
+ */
+ public synchronized void beginFetch(long currentTime)
+ {
+ if (currentTime > lastFetchTime)
+ lastFetchTime = currentTime;
+ reserveNextFetch = false;
+ }
+
+}
+
Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
------------------------------------------------------------------------------
svn:keywords = Id