You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/14 21:21:54 UTC
svn commit: r1550966 - in
/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler:
ConnectionBin.java Throttler.java
Author: kwright
Date: Sat Dec 14 20:21:54 2013
New Revision: 1550966
URL: http://svn.apache.org/r1550966
Log:
Hook up connection bins
Modified:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java?rev=1550966&r1=1550965&r2=1550966&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java Sat Dec 14 20:21:54 2013
@@ -41,6 +41,8 @@ import org.apache.manifoldcf.core.system
*/
public class ConnectionBin
{
+ /** True if this bin is alive still */
+ protected boolean isAlive = true;
/** This is the bin name which this connection pool belongs to */
protected final String binName;
/** This is the maximum number of active connections allowed for this bin */
@@ -74,8 +76,10 @@ public class ConnectionBin
}
/** Reserve a connection from this bin. If there is no connection yet available to reserve, wait
- * until there is. */
- public synchronized void reserveAConnection()
+ * until there is.
+ *@return false if the wait was aborted because the bin became inactivated.
+ */
+ public synchronized boolean reserveAConnection()
throws InterruptedException
{
// Reserved connections keep a slot available which can't be used by anyone else.
@@ -84,10 +88,12 @@ public class ConnectionBin
// and convert the reservation to a new connection.
while (true)
{
+ if (!isAlive)
+ return false;
if (inUseConnections + reservedConnections < maxActiveConnections)
{
reservedConnections++;
- return;
+ return true;
}
// Wait for a connection to free up. Note that it is up to the caller to free stuff up.
wait();
@@ -131,5 +137,13 @@ public class ConnectionBin
{
return inUseConnections;
}
+
+ /** Shut down the bin, and release everything that is waiting on it.
+ */
+ public synchronized void shutDown()
+ {
+ isAlive = false;
+ notifyAll();
+ }
}
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java?rev=1550966&r1=1550965&r2=1550966&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java Sat Dec 14 20:21:54 2013
@@ -370,12 +370,52 @@ public class Throttler
public IFetchThrottler obtainConnectionPermission(String[] binNames)
throws InterruptedException
{
- // First, make sure all the bins exist
- // MHL
- // Reserve a slot in all bins
- // MHL
- // Wait on each reserved bin in turn
- // MHL
+ // First, make sure all the bins exist, and reserve a slot in each
+ int i = 0;
+ while (i < binNames.length)
+ {
+ String binName = binNames[i];
+ ConnectionBin bin;
+ synchronized (connectionBins)
+ {
+ bin = connectionBins.get(binName);
+ if (bin == null)
+ {
+ bin = new ConnectionBin(binName);
+ connectionBins.put(binName, bin);
+ }
+ }
+ // Reserve a slot
+ if (!bin.reserveAConnection())
+ {
+ // Release previous reservations, and return null
+ while (i > 0)
+ {
+ i--;
+ binName = binNames[i];
+ synchronized (connectionBins)
+ {
+ bin = connectionBins.get(binName);
+ }
+ if (bin != null)
+ bin.clearReservation();
+ }
+ return null;
+ }
+ i++;
+ }
+
+ // All reservations have been made! Convert them.
+ for (String binName : binNames)
+ {
+ ConnectionBin bin;
+ synchronized (connectionBins)
+ {
+ bin = connectionBins.get(binName);
+ }
+ if (bin != null)
+ bin.noteConnectionCreation();
+ }
return new FetchThrottler(this, binNames);
}
@@ -391,7 +431,16 @@ public class Throttler
/** Release connection */
public void releaseConnectionPermission(String[] binNames)
{
- // MHL
+ for (String binName : binNames)
+ {
+ ConnectionBin bin;
+ synchronized (connectionBins)
+ {
+ bin = connectionBins.get(binName);
+ }
+ if (bin != null)
+ bin.noteConnectionDestruction();
+ }
}
// IFetchThrottler support methods