You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/13 13:21:01 UTC
svn commit: r1550704 - in
/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core:
interfaces/IConnectionThrottler.java interfaces/IFetchThrottler.java
throttler/ConnectionThrottler.java throttler/Throttler.java
Author: kwright
Date: Fri Dec 13 12:21:00 2013
New Revision: 1550704
URL: http://svn.apache.org/r1550704
Log:
Work on the API some more
Modified:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java?rev=1550704&r1=1550703&r2=1550704&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java Fri Dec 13 12:21:00 2013
@@ -18,6 +18,8 @@
*/
package org.apache.manifoldcf.core.interfaces;
+import java.util.*;
+
/** An IConnectionThrottler object is thread-local and creates a virtual pool
* of connections to resources whose access needs to be throttled in number,
* rate of use, and byte rate.
@@ -26,29 +28,65 @@ public interface IConnectionThrottler
{
public static final String _rcsid = "@(#)$Id$";
+ /** Get all existing throttle groups for a throttle group type.
+ * The throttle group type typically describes a connector class, while the throttle group represents
+ * a namespace of bin names specific to that connector class.
+ *@param throttleGroupType is the throttle group type.
+ *@return the set of throttle groups for that group type.
+ */
+ public Set<String> getThrottleGroups(String throttleGroupType);
+
+ /** Remove a throttle group.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ */
+ public void removeThrottleGroup(String throttleGroupType, String throttleGroup);
+
+ /** Set or update throttle specification for a throttle group. This creates the
+ * throttle group if it does not yet exist.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param throttleSpec is the desired throttle specification object.
+ */
+ public void updateThrottleSpecification(String throttleGroupType, String throttleGroup, IThrottleSpec throttleSpec);
+
/** Get permission to use a connection, which is described by the passed array of bin names.
- * The connection can be used multiple
- * times until the releaseConnectionPermission() method is called.
+ * This method may block until a connection slot is available.
+ * The connection can be used multiple times until the releaseConnectionPermission() method is called.
+ * This persistence feature is meant to allow connections to be pooled locally by the caller.
+ *@param throttleGroupType is the throttle group type.
*@param throttleGroup is the throttle group.
- *@param throttleSpec is the throttle specification to use for the throttle group,
*@param binNames is the set of bin names to throttle for, within the throttle group.
- *@param currentTime is the current time, in ms. since epoch.
- *@return the fetch throttler to use when performing fetches from this connection.
+ *@return the fetch throttler to use when performing fetches from the corresponding connection.
*/
- public IFetchThrottler obtainConnectionPermission(String throttleGroup,
- IThrottleSpec throttleSpec, String[] binNames, long currentTime)
- throws ManifoldCFException;
+ public IFetchThrottler obtainConnectionPermission(String throttleGroupType , String throttleGroup,
+ String[] binNames)
+ throws InterruptedException;
- /** Release permission to use a connection. This presumes that obtainConnectionPermission()
- * was called earlier in the same thread and was successful.
- *@param currentTime is the current time, in ms. since epoch.
+ /** Determine whether to release a pooled connection. This method returns the number of bins
+ * where the outstanding connection exceeds current quotas, indicating whether at least one with the specified
+ * characteristics should be released.
+ * NOTE WELL: This method cannot judge which is the best connection to be released to meet
+ * quotas. The caller needs to do that based on the highest number of bins matched.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param binNames is the set of bin names to throttle for, within the throttle group.
+ *@return the number of bins that are over quota, or zero if none of them are.
*/
- public void releaseConnectionPermission(long currentTime)
- throws ManifoldCFException;
+ public int overConnectionQuotaCount(String throttleGroupType, String throttleGroup, String[] binNames);
+
+ /** Release permission to use one connection. This presumes that obtainConnectionPermission()
+ * was called earlier by someone and was successful.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param binNames is the set of bin names to throttle for, within the throttle group.
+ */
+ public void releaseConnectionPermission(String throttleGroupType, String throttleGroup, String[] binNames);
- /** Poll periodically.
+ /** Poll periodically, to update cluster-wide statistics and allocation.
+ *@param throttleGroupType is the throttle group type to update.
*/
- public void poll()
+ public void poll(String throttleGroupType)
throws ManifoldCFException;
/** Free unused resources.
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java?rev=1550704&r1=1550703&r2=1550704&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java Fri Dec 13 12:21:00 2013
@@ -28,21 +28,6 @@ public interface IFetchThrottler
{
public static final String _rcsid = "@(#)$Id$";
- /** Get the throttle group for this fetch throttler.
- *@return the throttle group.
- */
- public String getThrottleGroup();
-
- /** Get the throttle specification for this fetch throttler.
- *@return the throttle specification.
- */
- public IThrottleSpec getThrottleSpecification();
-
- /** Get the bin names for this fetch throttler.
- *@return the bin names.
- */
- public String[] getBinNames();
-
/** Get permission to fetch a document. This grants permission to start
* fetching a single document, within the connection that has already been
* granted permission that created this object. When done (or aborting), call
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java?rev=1550704&r1=1550703&r2=1550704&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java Fri Dec 13 12:21:00 2013
@@ -19,6 +19,7 @@
package org.apache.manifoldcf.core.throttler;
import org.apache.manifoldcf.core.interfaces.*;
+import java.util.*;
/** An implementation of IConnectionThrottler, which establishes a JVM-wide
* pool of throttlers that can be used as a resource by any connector that needs
@@ -30,14 +31,7 @@ public class ConnectionThrottler impleme
/** The thread context */
protected final IThreadContext threadContext;
-
- // Parameters of the current connection
-
- protected String throttleGroup = null;
- protected IThrottleSpec throttleSpec = null;
- protected String[] binNames = null;
-
-
+
/** The actual static pool */
protected final static Throttler throttler = new Throttler();
@@ -48,54 +42,96 @@ public class ConnectionThrottler impleme
this.threadContext = threadContext;
}
+ /** Get all existing throttle groups for a throttle group type.
+ * The throttle group type typically describes a connector class, while the throttle group represents
+ * a namespace of bin names specific to that connector class.
+ *@param throttleGroupType is the throttle group type.
+ *@return the set of throttle groups for that group type.
+ */
+ @Override
+ public Set<String> getThrottleGroups(String throttleGroupType)
+ {
+ // MHL
+ return null;
+ }
+
+ /** Remove a throttle group.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ */
+ @Override
+ public void removeThrottleGroup(String throttleGroupType, String throttleGroup)
+ {
+ // MHL
+ }
+
+ /** Set or update throttle specification for a throttle group. This creates the
+ * throttle group if it does not yet exist.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param throttleSpec is the desired throttle specification object.
+ */
+ @Override
+ public void updateThrottleSpecification(String throttleGroupType, String throttleGroup, IThrottleSpec throttleSpec)
+ {
+ // MHL
+ }
+
/** Get permission to use a connection, which is described by the passed array of bin names.
- * The connection can be used multiple
- * times until the releaseConnectionPermission() method is called.
+ * This method may block until a connection slot is available.
+ * The connection can be used multiple times until the releaseConnectionPermission() method is called.
+ * This persistence feature is meant to allow connections to be pooled locally by the caller.
+ *@param throttleGroupType is the throttle group type.
*@param throttleGroup is the throttle group.
- *@param throttleSpec is the throttle specification to use for the throttle group,
*@param binNames is the set of bin names to throttle for, within the throttle group.
- *@param currentTime is the current time, in ms. since epoch.
- *@return the fetch throttler to use when performing fetches from this connection.
+ *@return the fetch throttler to use when performing fetches from the corresponding connection.
*/
@Override
- public IFetchThrottler obtainConnectionPermission(String throttleGroup,
- IThrottleSpec throttleSpec, String[] binNames, long currentTime)
- throws ManifoldCFException
+ public IFetchThrottler obtainConnectionPermission(String throttleGroupType , String throttleGroup,
+ String[] binNames)
+ throws InterruptedException
+ {
+ // MHL
+ return null;
+ }
+
+ /** Determine whether to release a pooled connection. This method returns the number of bins
+ * where the outstanding connection exceeds current quotas, indicating whether at least one with the specified
+ * characteristics should be released.
+ * NOTE WELL: This method cannot judge which is the best connection to be released to meet
+ * quotas. The caller needs to do that based on the highest number of bins matched.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param binNames is the set of bin names to throttle for, within the throttle group.
+ *@return the number of bins that are over quota, or zero if none of them are.
+ */
+ @Override
+ public int overConnectionQuotaCount(String throttleGroupType, String throttleGroup, String[] binNames)
{
- if (throttleGroup != null || throttleSpec != null || binNames != null)
- throw new IllegalStateException("Already an active connection in this thread; must finish that one first");
- this.throttleGroup = throttleGroup;
- this.binNames = binNames;
- this.throttleSpec = throttleSpec;
-
- return throttler.obtainConnectionPermission(threadContext, throttleGroup,
- throttleSpec, binNames, currentTime);
+ // MHL
+ return 0;
}
- /** Release permission to use a connection. This presumes that obtainConnectionPermission()
- * was called earlier in the same thread and was successful.
- *@param currentTime is the current time, in ms. since epoch.
+ /** Release permission to use one connection. This presumes that obtainConnectionPermission()
+ * was called earlier by someone and was successful.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param binNames is the set of bin names to throttle for, within the throttle group.
*/
@Override
- public void releaseConnectionPermission(long currentTime)
- throws ManifoldCFException
+ public void releaseConnectionPermission(String throttleGroupType, String throttleGroup, String[] binNames)
{
- if (throttleGroup == null || throttleSpec == null || binNames == null)
- throw new IllegalStateException("No active connection in this thread!");
- throttler.releaseConnectionPermission(threadContext, throttleGroup,
- throttleSpec, binNames, currentTime);
- this.throttleGroup = null;
- this.binNames = null;
- this.throttleSpec = null;
+ // MHL
}
-
- /** Poll periodically.
+
+ /** Poll periodically, to update cluster-wide statistics and allocation.
+ *@param throttleGroupType is the throttle group type to update.
*/
@Override
- public void poll()
+ public void poll(String throttleGroupType)
throws ManifoldCFException
{
- throttler.poll(threadContext);
+ // MHL
}
/** Free unused resources.
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java?rev=1550704&r1=1550703&r2=1550704&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java Fri Dec 13 12:21:00 2013
@@ -42,8 +42,8 @@ public class Throttler
/** The service type prefix for throttle pools */
protected final static String serviceTypePrefix = "_THROTTLEPOOL_";
- /** Pool hash table. Keyed by throttle group name; value is Pool */
- protected final Map<String,Pool> poolHash = new HashMap<String,Pool>();
+ /** Throttle group hash table. Keyed by throttle group type, value is throttling groups */
+ protected final Map<String,ThrottlingGroups> throttleGroupsHash = new HashMap<String,ThrottlingGroups>();
/** Create a throttler instance. Usually there will be one of these per connector
* type that needs throttling.
@@ -52,69 +52,97 @@ public class Throttler
{
}
- /** Check if throttle group name is still valid.
- * This can be overridden, but right now nobody does it.
+ /** Get all existing throttle groups for a throttle group type.
+ * The throttle group type typically describes a connector class, while the throttle group represents
+ * a namespace of bin names specific to that connector class.
+ *@param throttleGroupType is the throttle group type.
+ *@return the set of throttle groups for that group type.
*/
- protected boolean isThrottleGroupValid(IThreadContext threadContext, String throttleGroupName)
- throws ManifoldCFException
+ public Set<String> getThrottleGroups(String throttleGroupType)
{
- return true;
+ // MHL
+ return null;
+ }
+
+ /** Remove a throttle group.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ */
+ public void removeThrottleGroup(String throttleGroupType, String throttleGroup)
+ {
+ // MHL
}
+ /** Set or update throttle specification for a throttle group. This creates the
+ * throttle group if it does not yet exist.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param throttleSpec is the desired throttle specification object.
+ */
+ public void updateThrottleSpecification(String throttleGroupType, String throttleGroup, IThrottleSpec throttleSpec)
+ {
+ // MHL
+ }
+
/** Get permission to use a connection, which is described by the passed array of bin names.
- * The connection can be used multiple
- * times until the releaseConnectionPermission() method is called.
- *@param threadContext is the thread context.
+ * This method may block until a connection slot is available.
+ * The connection can be used multiple times until the releaseConnectionPermission() method is called.
+ * This persistence feature is meant to allow connections to be pooled locally by the caller.
+ *@param throttleGroupType is the throttle group type.
*@param throttleGroup is the throttle group.
- *@param throttleSpec is the throttle specification to use for the throttle group,
*@param binNames is the set of bin names to throttle for, within the throttle group.
- *@param currentTime is the current time, in ms. since epoch.
- *@return the fetch throttler to use for fetches with the obtained connection.
+ *@return the fetch throttler to use when performing fetches from the corresponding connection.
*/
- public IFetchThrottler obtainConnectionPermission(IThreadContext threadContext, String throttleGroup,
- IThrottleSpec throttleSpec, String[] binNames, long currentTime)
- throws ManifoldCFException
+ public IFetchThrottler obtainConnectionPermission(String throttleGroupType , String throttleGroup,
+ String[] binNames)
+ throws InterruptedException
{
// MHL
return null;
}
- /** Release permission to use a connection. This presumes that obtainConnectionPermission()
- * was called earlier in the same thread and was successful.
- *@param threadContext is the thread context.
- *@param throttleGroup is the throttle group name.
- *@param throttleSpec is the throttle specification to use for the throttle group,
- *@param currentTime is the current time, in ms. since epoch.
+ /** Determine whether to release a pooled connection. This method returns the number of bins
+ * where the outstanding connection exceeds current quotas, indicating whether at least one with the specified
+ * characteristics should be released.
+ * NOTE WELL: This method cannot judge which is the best connection to be released to meet
+ * quotas. The caller needs to do that based on the highest number of bins matched.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param binNames is the set of bin names to throttle for, within the throttle group.
+ *@return the number of bins that are over quota, or zero if none of them are.
*/
- public void releaseConnectionPermission(IThreadContext threadContext, String throttleGroup,
- IThrottleSpec throttleSpec, String[] binNames, long currentTime)
- throws ManifoldCFException
+ public int overConnectionQuotaCount(String throttleGroupType, String throttleGroup, String[] binNames)
+ {
+ // MHL
+ return 0;
+ }
+
+ /** Release permission to use one connection. This presumes that obtainConnectionPermission()
+ * was called earlier by someone and was successful.
+ *@param throttleGroupType is the throttle group type.
+ *@param throttleGroup is the throttle group.
+ *@param binNames is the set of bin names to throttle for, within the throttle group.
+ */
+ public void releaseConnectionPermission(String throttleGroupType, String throttleGroup, String[] binNames)
{
// MHL
}
/** Poll periodically.
*/
- public void poll(IThreadContext threadContext)
+ public void poll(IThreadContext threadContext, String throttleGroupType)
throws ManifoldCFException
{
- // Go through the whole pool and notify everyone
- synchronized (poolHash)
+ // Find the right pool, and poll that
+ ThrottlingGroups tg;
+ synchronized (throttleGroupsHash)
{
- Iterator<String> iter = poolHash.keySet().iterator();
- while (iter.hasNext())
- {
- String throttleGroup = iter.next();
- Pool p = poolHash.get(throttleGroup);
- if (isThrottleGroupValid(threadContext,throttleGroup))
- p.poll(threadContext);
- else
- {
- p.destroy(threadContext);
- iter.remove();
- }
- }
+ tg = throttleGroupsHash.get(throttleGroupType);
}
+
+ if (tg != null)
+ tg.poll(threadContext);
+
}
/** Free unused resources.
@@ -123,12 +151,12 @@ public class Throttler
throws ManifoldCFException
{
// Go through the whole pool and clean it out
- synchronized (poolHash)
+ synchronized (throttleGroupsHash)
{
- Iterator<Pool> iter = poolHash.values().iterator();
+ Iterator<ThrottlingGroups> iter = throttleGroupsHash.values().iterator();
while (iter.hasNext())
{
- Pool p = iter.next();
+ ThrottlingGroups p = iter.next();
p.freeUnusedResources(threadContext);
}
}
@@ -140,12 +168,12 @@ public class Throttler
throws ManifoldCFException
{
// Go through the whole pool and clean it out
- synchronized (poolHash)
+ synchronized (throttleGroupsHash)
{
- Iterator<Pool> iter = poolHash.values().iterator();
+ Iterator<ThrottlingGroups> iter = throttleGroupsHash.values().iterator();
while (iter.hasNext())
{
- Pool p = iter.next();
+ ThrottlingGroups p = iter.next();
p.destroy(threadContext);
iter.remove();
}
@@ -154,15 +182,69 @@ public class Throttler
// Protected methods and classes
- protected String buildServiceTypeName(String throttleGroupName)
+ protected String buildServiceTypeName(String throttlingGroupType, String throttleGroupName)
{
- return serviceTypePrefix + throttleGroupName;
+ return serviceTypePrefix + throttlingGroupType + "_" + throttleGroupName;
}
- /** This class represents a value in the pool hash, which corresponds to a given key.
+ /** This class represents a throttling group pool */
+ protected class ThrottlingGroups
+ {
+ /** The throttling group type for this throttling group pool */
+ protected final String throttlingGroupTypeName;
+ /** The pool of individual throttle group services for this pool, keyed by throttle group name */
+ protected final Map<String,ThrottlingGroup> groups = new HashMap<String,ThrottlingGroup>();
+
+ public ThrottlingGroups(String throttlingGroupTypeName)
+ {
+ this.throttlingGroupTypeName = throttlingGroupTypeName;
+ }
+
+ // MHL
+
+ /** Poll this set of throttle groups */
+ public synchronized void poll(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ Iterator<String> iter = groups.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String throttleGroup = iter.next();
+ ThrottlingGroup p = groups.get(throttleGroup);
+ p.poll(threadContext);
+ }
+ }
+
+ /** Free unused resources */
+ public synchronized void freeUnusedResources(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ Iterator<ThrottlingGroup> iter = groups.values().iterator();
+ while (iter.hasNext())
+ {
+ ThrottlingGroup g = iter.next();
+ g.freeUnusedResources(threadContext);
+ }
+ }
+
+ /** Destroy and shutdown all */
+ public synchronized void destroy(IThreadContext threadContext)
+ throws ManifoldCFException
+ {
+ Iterator<ThrottlingGroup> iter = groups.values().iterator();
+ while (iter.hasNext())
+ {
+ ThrottlingGroup p = iter.next();
+ p.destroy(threadContext);
+ iter.remove();
+ }
+ }
+ }
+
+ /** This class represents a throttling group, of a specific throttling group type.
*/
- protected class Pool
+ protected class ThrottlingGroup
{
/** Whether this pool is alive */
protected boolean isAlive = true;
@@ -175,10 +257,10 @@ public class Throttler
/** Constructor
*/
- public Pool(IThreadContext threadContext, String throttleGroup, IThrottleSpec throttleSpec)
+ public ThrottlingGroup(IThreadContext threadContext, String throttlingGroupType, String throttleGroup, IThrottleSpec throttleSpec)
throws ManifoldCFException
{
- this.serviceTypeName = buildServiceTypeName(throttleGroup);
+ this.serviceTypeName = buildServiceTypeName(throttlingGroupType, throttleGroup);
this.throttleSpec = throttleSpec;
// Now, register and activate service anonymously, and record the service name we get.
ILockManager lockManager = LockManagerFactory.make(threadContext);