You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/15 20:59:22 UTC
svn commit: r1551046 - in
/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core:
interfaces/IConnectionThrottler.java throttler/ConnectionBin.java
throttler/Throttler.java
Author: kwright
Date: Sun Dec 15 19:59:21 2013
New Revision: 1551046
URL: http://svn.apache.org/r1551046
Log:
A more appropriate API for connection throttling, which takes into account that there's likely to be a pool underneath it all.
Modified:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java?rev=1551046&r1=1551045&r2=1551046&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java Sun Dec 15 19:59:21 2013
@@ -21,33 +21,92 @@ package org.apache.manifoldcf.core.inter
import java.util.*;
/** An IConnectionThrottler object is not thread-local. It gates connection
-* creation.
+* creation and pool management.
+* The underlying model is a pool of connections. A connection gets pulled off the pool and
+* used to perform a fetch. If there are insufficient connections in the pool, and there is
+* sufficient capacity to create a new connection, a connection will be created instead.
+* When the fetch is done, the connection is returned, and then there is a decision whether
+* or not to put the connection back into the pool, or to destroy it. Finally, the pool is
+* periodically evaluated, and connections may be destroyed if either they have expired,
+* or the allocated connections are still over capacity.
+*
+* This object does not in itself contain a connection pool - but it is intended to assist
+* in the management of that pool. Specifically, it tracks connections that are in the
+* pool, and connections that are handed out for use, and performs ALL the waiting needed
+* due to the pool being empty and/or the number of active connections being at or over
+* the quota.
*/
public interface IConnectionThrottler
{
public static final String _rcsid = "@(#)$Id$";
- /** Get permission to use a connection, which is described by the passed array of bin names.
- * This method may block until a connection slot is available.
- * The connection can be used multiple times until the releaseConnectionPermission() method is called.
- * This persistence feature is meant to allow connections to be pooled locally by the caller.
- *@return the fetch throttler to use when performing fetches from the corresponding connection, or null if the system is being shut down.
+ // For grabbing a connection for use
+
+ /** Get the connection from the pool */
+ public final static int CONNECTION_FROM_POOL = 0;
+ /** Create a connection */
+ public final static int CONNECTION_FROM_CREATION = 1;
+ /** Pool shutting down */
+ public final static int CONNECTION_FROM_NOWHERE = -1;
+
+ /** Get permission to grab a connection for use. If this object believes there is a connection
+ * available in the pool, it will update its pool size variable and return If not, this method
+ * evaluates whether a new connection should be created. If neither condition is true, it
+ * waits until a connection is available.
+ *@return whether to take the connection from the pool, or create one, or whether the
+ * throttler is being shut down.
*/
- public IFetchThrottler obtainConnectionPermission()
+ public int waitConnectionAvailable()
throws InterruptedException;
- /** Determine whether to release a pooled connection. This method returns the number of bins
- * where the outstanding connection exceeds current quotas, indicating whether at least one with the specified
- * characteristics should be released.
- * NOTE WELL: This method cannot judge which is the best connection to be released to meet
- * quotas. The caller needs to do that based on the highest number of bins matched.
- *@return the number of bins that are over quota, or zero if none of them are. Returns Integer.MAX_VALUE if shutting down.
+ /** For a new connection, obtain the fetch throttler to use for the connection.
+ * If the result from waitConnectionAvailable() is CONNECTION_FROM_CREATION,
+ * the calling code is expected to create a connection using the result of this method.
+ *@return the fetch throttler for a new connection.
+ */
+ public IFetchThrottler getNewConnectionFetchThrottler();
+
+ /** This method indicates whether a formerly in-use connection should be placed back
+ * in the pool or destroyed.
+ *@return true if the connection should not be put into the pool but should instead
+ * simply be destroyed. If true is returned, the caller MUST call noteConnectionDestroyed()
+ * after the connection is destroyed in order for the bookkeeping to work. If false
+ * is returned, the caller MUST call noteConnectionReturnedToPool() after the connection
+ * is returned to the pool.
+ */
+ public boolean noteReturnedConnection();
+
+ /** This method calculates whether a connection should be taken from the pool and destroyed
+ /* in order to meet quota requirements. If this method returns
+ /* true, you MUST remove a connection from the pool, and you MUST call
+ /* noteConnectionDestroyed() afterwards.
+ *@return true if a pooled connection should be destroyed. If true is returned, the
+ * caller MUST call noteConnectionDestroyed() (below) in order for the bookkeeping to work.
+ */
+ public boolean checkDestroyPooledConnection();
+
+ /** Connection expiration is tricky, because even though a connection may be identified as
+ * being expired, at the very same moment it could be handed out in another thread. So there
+ * is a natural race condition present.
+ * The way the connection throttler deals with that is to allow the caller to reserve a connection
+ * for expiration. This must be called BEFORE the actual identified connection is removed from the
+ * connection pool. If the value returned by this method is "true", then a connection MUST be removed
+ * from the pool and destroyed, whether or not the identified connection is actually still available for
+ * destruction or not.
+ *@return true if a connection from the pool can be expired. If true is returned, noteConnectionDestruction()
+ * MUST be called once the connection has actually been destroyed.
+ */
+ public boolean checkExpireConnection();
+
+ /** Note that a connection has been returned to the pool. Call this method after a connection has been
+ * placed back into the pool and is available for use.
*/
- public int overConnectionQuotaCount();
+ public void noteConnectionReturnedToPool();
- /** Release permission to use one connection. This presumes that obtainConnectionPermission()
- * was called earlier by someone and was successful.
+ /** Note that a connection has been destroyed. Call this method ONLY after noteReturnedConnection()
+ * or checkDestroyPooledConnection() returns true, AND the connection has been already
+ * destroyed.
*/
- public void releaseConnectionPermission();
+ public void noteConnectionDestroyed();
}
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java?rev=1551046&r1=1551045&r2=1551046&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java Sun Dec 15 19:59:21 2013
@@ -20,6 +20,7 @@ package org.apache.manifoldcf.core.throt
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.concurrent.atomic.*;
/** Connection tracking for a bin.
*
@@ -75,39 +76,60 @@ public class ConnectionBin
notifyAll();
}
- /** Reserve a connection from this bin. If there is no connection yet available to reserve, wait
- * until there is.
- *@return false if the wait was aborted because the bin became inactivated.
+ /** Wait for a connection to become available, in the context of an existing connection pool.
+ *@param poolCount is the number of connections in the pool times the number of bins per connection.
+ * This parameter is only ever changed in this class!!
+ *@return a recommendation as to how to proceed, using the IConnectionThrottler values. If the
+ * recommendation is to create a connection, a slot will be reserved for that purpose. A
+ * subsequent call to noteConnectionCreation() will be needed to confirm the reservation, or clearReservation() to
+ * release the reservation.
*/
- public synchronized boolean reserveAConnection()
+ public synchronized int waitConnectionAvailable(AtomicInteger poolCount)
throws InterruptedException
{
// Reserved connections keep a slot available which can't be used by anyone else.
// Connection bins are always sorted so that deadlocks can't occur.
// Once all slots are reserved, the caller will go ahead and create the necessary connection
// and convert the reservation to a new connection.
+
while (true)
{
if (!isAlive)
- return false;
+ return IConnectionThrottler.CONNECTION_FROM_NOWHERE;
+ int currentPoolCount = poolCount.get();
+ if (currentPoolCount > 0)
+ {
+ // Recommendation is to pull the connection from the pool.
+ poolCount.set(currentPoolCount - 1);
+ return IConnectionThrottler.CONNECTION_FROM_POOL;
+ }
if (inUseConnections + reservedConnections < maxActiveConnections)
{
reservedConnections++;
- return true;
+ return IConnectionThrottler.CONNECTION_FROM_CREATION;
}
// Wait for a connection to free up. Note that it is up to the caller to free stuff up.
wait();
}
}
- /** Clear reservation.
+ /** Undo what we had decided to do before.
+ *@param recommendation is the decision returned by waitForConnection() above.
*/
- public synchronized void clearReservation()
+ public synchronized void undoReservation(int recommendation, AtomicInteger poolCount)
{
- if (reservedConnections == 0)
- throw new IllegalStateException("Can't clear a reservation we don't have");
- reservedConnections--;
- notifyAll();
+ if (recommendation == IConnectionThrottler.CONNECTION_FROM_CREATION)
+ {
+ if (reservedConnections == 0)
+ throw new IllegalStateException("Can't clear a reservation we don't have");
+ reservedConnections--;
+ notifyAll();
+ }
+ else if (recommendation == IConnectionThrottler.CONNECTION_FROM_POOL)
+ {
+ poolCount.set(poolCount.get() + 1);
+ notifyAll();
+ }
}
/** Note the creation of an active connection that belongs to this bin. The connection MUST
@@ -122,22 +144,31 @@ public class ConnectionBin
// No notification needed because the total number of reserved+active connections did not change.
}
- /** Note the destruction of an active connection that belongs to this bin.
+ /** Figure out whether we are currently over target or not for this bin.
*/
- public synchronized void noteConnectionDestruction()
+ public synchronized boolean shouldReturnedConnectionBeDestroyed()
{
- inUseConnections--;
- notifyAll();
+ // We don't count reserved connections here because those are not yet committed
+ return inUseConnections > maxActiveConnections;
}
-
- /** Count connections that are active.
- *@return connections that are in use.
+
+ /** Note a connection returned to the pool.
*/
- public synchronized int countConnections()
+ public synchronized void noteConnectionReturnedToPool(AtomicInteger poolCount)
{
- return inUseConnections;
+ poolCount.set(poolCount.get() + 1);
+ // Wake up threads possibly waiting on a pool return.
+ notifyAll();
}
+ /** Note the destruction of an active connection that belongs to this bin.
+ */
+ public synchronized void noteConnectionDestroyed()
+ {
+ inUseConnections--;
+ notifyAll();
+ }
+
/** Shut down the bin, and release everything that is waiting on it.
*/
public synchronized void shutDown()
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java?rev=1551046&r1=1551045&r2=1551046&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java Sun Dec 15 19:59:21 2013
@@ -20,6 +20,7 @@ package org.apache.manifoldcf.core.throt
import org.apache.manifoldcf.core.interfaces.*;
import java.util.*;
+import java.util.concurrent.atomic.*;
/** A Throttler object creates a virtual pool of connections to resources
* whose access needs to be throttled in number, rate of use, and byte rate.
@@ -364,72 +365,137 @@ public class Throttler
// IConnectionThrottler support methods
- /** Obtain connection permission.
- *@return null if we are marked as 'not alive'.
+ /** Wait for a connection to become available.
+ *@param poolCount is a description of how many connections
+ * are available in the current pool, across all bins.
+ *@return the IConnectionThrottler codes for results.
*/
- public IFetchThrottler obtainConnectionPermission(String[] binNames)
+ public int waitConnectionAvailable(String[] binNames, AtomicInteger poolCount)
throws InterruptedException
{
- // First, make sure all the bins exist, and reserve a slot in each
- int i = 0;
- while (i < binNames.length)
- {
- String binName = binNames[i];
- ConnectionBin bin;
- synchronized (connectionBins)
- {
- bin = connectionBins.get(binName);
- if (bin == null)
+ // Each bin can signal something different. Bins that signal
+ // CONNECTION_FROM_NOWHERE are shutting down, but there's also
+ // apparently the conflicting possibilities of distinct answers of
+ // CONNECTION_FROM_POOL and CONNECTION_FROM_CREATION.
+ // However: the pool count we track is in fact N * the actual pool count,
+ // where N is the number of bins in each connection. This means that a conflict
+ // is ALWAYS due to two entities simultaneously calling waitConnectionAvailable(),
+ // and deadlocking each other. The solution is therefore to back off and retry.
+
+ // This is the retry loop
+ while (true)
+ {
+ int currentRecommendation = IConnectionThrottler.CONNECTION_FROM_NOWHERE;
+
+ boolean retry = false;
+
+ // First, make sure all the bins exist, and reserve a slot in each
+ int i = 0;
+ while (i < binNames.length)
+ {
+ String binName = binNames[i];
+ ConnectionBin bin;
+ synchronized (connectionBins)
{
- bin = new ConnectionBin(binName);
- connectionBins.put(binName, bin);
+ bin = connectionBins.get(binName);
+ if (bin == null)
+ {
+ bin = new ConnectionBin(binName);
+ connectionBins.put(binName, bin);
+ }
}
- }
- // Reserve a slot
- if (!bin.reserveAConnection())
+ // Reserve a slot
+ int result = bin.waitConnectionAvailable(poolCount);
+ if (result == IConnectionThrottler.CONNECTION_FROM_NOWHERE ||
+ (currentRecommendation != IConnectionThrottler.CONNECTION_FROM_NOWHERE && currentRecommendation != result))
+ {
+ // Release previous reservations, and either return, or retry
+ while (i > 0)
+ {
+ i--;
+ binName = binNames[i];
+ synchronized (connectionBins)
+ {
+ bin = connectionBins.get(binName);
+ }
+ if (bin != null)
+ bin.undoReservation(currentRecommendation, poolCount);
+ }
+ if (result == IConnectionThrottler.CONNECTION_FROM_NOWHERE)
+ return result;
+ // Break out of the outer loop so we can retry
+ retry = true;
+ break;
+ }
+ if (currentRecommendation == IConnectionThrottler.CONNECTION_FROM_NOWHERE)
+ currentRecommendation = result;
+ i++;
+ }
+
+ if (retry)
+ continue;
+
+ // Complete the reservation process (if that is what we decided)
+ if (currentRecommendation == IConnectionThrottler.CONNECTION_FROM_CREATION)
{
- // Release previous reservations, and return null
- while (i > 0)
+ // All reservations have been made! Convert them.
+ for (String binName : binNames)
{
- i--;
- binName = binNames[i];
+ ConnectionBin bin;
synchronized (connectionBins)
{
bin = connectionBins.get(binName);
}
if (bin != null)
- bin.clearReservation();
+ bin.noteConnectionCreation();
}
- return null;
}
- i++;
+
+ return currentRecommendation;
}
- // All reservations have been made! Convert them.
- for (String binName : binNames)
+ }
+
+ public IFetchThrottler getNewConnectionFetchThrottler(String[] binNames)
+ {
+ return new FetchThrottler(this, binNames);
+ }
+
+ public boolean noteReturnedConnection(String[] binNames)
+ {
+ // If ANY of the bins think the connection should be destroyed, then that will be
+ // the recommendation.
+ synchronized (connectionBins)
{
- ConnectionBin bin;
- synchronized (connectionBins)
+ boolean destroyConnection = false;
+
+ for (String binName : binNames)
{
- bin = connectionBins.get(binName);
+ ConnectionBin bin = connectionBins.get(binName);
+ if (bin != null)
+ {
+ destroyConnection |= bin.shouldReturnedConnectionBeDestroyed();
+ }
}
- if (bin != null)
- bin.noteConnectionCreation();
+
+ return destroyConnection;
}
- return new FetchThrottler(this, binNames);
}
- /** Count the number of bins that are over quota.
- *@return Integer.MAX_VALUE if shutting down.
- */
- public int overConnectionQuotaCount(String[] binNames)
+ public void noteConnectionReturnedToPool(String[] binNames, AtomicInteger poolCount)
{
- // MHL
- return Integer.MAX_VALUE;
+ synchronized (connectionBins)
+ {
+ for (String binName : binNames)
+ {
+ ConnectionBin bin = connectionBins.get(binName);
+ if (bin != null)
+ bin.noteConnectionReturnedToPool(poolCount);
+ }
+ }
}
-
- /** Release connection */
- public void releaseConnectionPermission(String[] binNames)
+
+ public void noteConnectionDestroyed(String[] binNames)
{
synchronized (connectionBins)
{
@@ -437,7 +503,7 @@ public class Throttler
{
ConnectionBin bin = connectionBins.get(binName);
if (bin != null)
- bin.noteConnectionDestruction();
+ bin.noteConnectionDestroyed();
}
}
}
@@ -657,7 +723,7 @@ public class Throttler
public synchronized void freeUnusedResources(IThreadContext threadContext)
throws ManifoldCFException
{
- // MHL
+ // Does nothing; there are not really resources to free
}
/** Destroy this pool.
@@ -707,54 +773,124 @@ public class Throttler
}
/** Connection throttler implementation class.
- * This basically stores some parameters and links back to ThrottlingGroup.
+ * This class instance stores some parameters and links back to ThrottlingGroup. But each class instance
+ * models a connection pool with the specified bins. But the description of each pool consists of more than just
+ * the bin names that describe the throttling - it also may include connection parameters which we have
+ * no insight into at this level.
+ *
+ * Thus, in order to do pool tracking properly, we cannot simply rely on the individual connection bin instances
+ * to do all the work, since they cannot distinguish between different pools properly. So that leaves us with
+ * two choices. (1) We can somehow push the separate pool instance parameters down to the connection bin
+ * level, or (2) the connection bins cannot actually do any waiting or blocking.
+ *
+ * The benefit of having blocking take place in connection bins is that they are in fact designed to be precisely
+ * the thing you would want to synchronize on. If we presume that the waits happen in those classes,
+ * then we need the ability to send in our local pool count to them, and we need to be able to "wake up"
+ * those underlying classes when the local pool count changes.
*/
protected static class ConnectionThrottler implements IConnectionThrottler
{
protected final ThrottlingGroup parent;
protected final String[] binNames;
+ // Keep track of local pool parameters.
+
+ /** This is the number of connections in the pool, times the number of bins per connection */
+ protected final AtomicInteger poolCount = new AtomicInteger(0);
+
public ConnectionThrottler(ThrottlingGroup parent, String[] binNames)
{
this.parent = parent;
this.binNames = binNames;
}
- /** Get permission to use a connection, which is described by the passed array of bin names.
- * This method may block until a connection slot is available.
- * The connection can be used multiple times until the releaseConnectionPermission() method is called.
- * This persistence feature is meant to allow connections to be pooled locally by the caller.
- *@return the fetch throttler to use when performing fetches from the corresponding connection, or null if the system is being shut down.
+ /** Get permission to grab a connection for use. If this object believes there is a connection
+ * available in the pool, it will update its pool size variable and return If not, this method
+ * evaluates whether a new connection should be created. If neither condition is true, it
+ * waits until a connection is available.
+ *@return whether to take the connection from the pool, or create one, or whether the
+ * throttler is being shut down.
*/
@Override
- public IFetchThrottler obtainConnectionPermission()
+ public int waitConnectionAvailable()
throws InterruptedException
{
- return parent.obtainConnectionPermission(binNames);
+ return parent.waitConnectionAvailable(binNames, poolCount);
+ }
+
+ /** For a new connection, obtain the fetch throttler to use for the connection.
+ * If the result from waitConnectionAvailable() is CONNECTION_FROM_CREATION,
+ * the calling code is expected to create a connection using the result of this method.
+ *@return the fetch throttler for a new connection.
+ */
+ @Override
+ public IFetchThrottler getNewConnectionFetchThrottler()
+ {
+ return parent.getNewConnectionFetchThrottler(binNames);
+ }
+
+ /** For returning a connection from use, there is only one method. This method signals
+ /* whether a formerly in-use connection should be placed back in the pool or destroyed.
+ *@return true if the connection should NOT be put into the pool but should instead
+ * simply be destroyed. If true is returned, the caller MUST call noteConnectionDestroyed()
+ * (below) in order for the bookkeeping to work.
+ */
+ @Override
+ public boolean noteReturnedConnection()
+ {
+ return parent.noteReturnedConnection(binNames);
+ }
+
+ /** This method calculates whether a connection should be taken from the pool and destroyed
+ /* in order to meet quota requirements. If this method returns
+ /* true, you MUST remove a connection from the pool, and you MUST call
+ /* noteConnectionDestroyed() afterwards.
+ *@return true if a pooled connection should be destroyed. If true is returned, the
+ * caller MUST call noteConnectionDestroyed() (below) in order for the bookkeeping to work.
+ */
+ @Override
+ public boolean checkDestroyPooledConnection()
+ {
+ // MHL
+ return false;
}
- /** Determine whether to release a pooled connection. This method returns the number of bins
- * where the outstanding connection exceeds current quotas, indicating whether at least one with the specified
- * characteristics should be released.
- * NOTE WELL: This method cannot judge which is the best connection to be released to meet
- * quotas. The caller needs to do that based on the highest number of bins matched.
- *@return the number of bins that are over quota, or zero if none of them are.
+ /** Connection expiration is tricky, because even though a connection may be identified as
+ * being expired, at the very same moment it could be handed out in another thread. So there
+ * is a natural race condition present.
+ * The way the connection throttler deals with that is to allow the caller to reserve a connection
+ * for expiration. This must be called BEFORE the actual identified connection is removed from the
+ * connection pool. If the value returned by this method is "true", then a connection MUST be removed
+ * from the pool and destroyed, whether or not the identified connection is actually still available for
+ * destruction or not.
+ *@return true if a connection from the pool can be expired. If true is returned, noteConnectionDestruction()
+ * MUST be called once the connection has actually been destroyed.
*/
@Override
- public int overConnectionQuotaCount()
+ public boolean checkExpireConnection()
{
- return parent.overConnectionQuotaCount(binNames);
+ // MHL
+ return false;
}
- /** Release permission to use one connection. This presumes that obtainConnectionPermission()
- * was called earlier by someone and was successful.
+ /** Note that a connection has been returned to the pool. Call this method after a connection has been
+ * placed back into the pool and is available for use.
*/
@Override
- public void releaseConnectionPermission()
+ public void noteConnectionReturnedToPool()
{
- parent.releaseConnectionPermission(binNames);
+ parent.noteConnectionReturnedToPool(binNames, poolCount);
}
+ /** Note that a connection has been destroyed. Call this method ONLY after noteReturnedConnection()
+ * or checkDestroyPooledConnection() returns true, AND the connection has been already
+ * destroyed.
+ */
+ @Override
+ public void noteConnectionDestroyed()
+ {
+ parent.noteConnectionDestroyed(binNames);
+ }
}
/** Fetch throttler implementation class.