You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/14 10:12:52 UTC
svn commit: r1550906 - in
/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core:
interfaces/ throttler/
Author: kwright
Date: Sat Dec 14 09:12:51 2013
New Revision: 1550906
URL: http://svn.apache.org/r1550906
Log:
More development
Modified:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java Sat Dec 14 09:12:51 2013
@@ -60,7 +60,7 @@ public interface IConnectionThrottler
*@param throttleGroupType is the throttle group type.
*@param throttleGroup is the throttle group.
*@param binNames is the set of bin names to throttle for, within the throttle group.
- *@return the fetch throttler to use when performing fetches from the corresponding connection.
+ *@return the fetch throttler to use when performing fetches from the corresponding connection, or null if the system is being shut down.
*/
public IFetchThrottler obtainConnectionPermission(String throttleGroupType , String throttleGroup,
String[] binNames)
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java Sat Dec 14 09:12:51 2013
@@ -34,7 +34,7 @@ public interface IFetchThrottler
* releaseFetchDocumentPermission() to note the completion of the document
* fetch activity.
*@param currentTime is the current time, in ms. since epoch.
- *@return the stream throttler to use to throttle the actual data access.
+ *@return the stream throttler to use to throttle the actual data access, or null if the system is being shut down.
*/
public IStreamThrottler obtainFetchDocumentPermission(long currentTime)
throws InterruptedException;
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java Sat Dec 14 09:12:51 2013
@@ -33,8 +33,9 @@ public interface IStreamThrottler
* to this specific interface object, so it is unnecessary to include them here.
*@param currentTime is the current time, in ms. since epoch.
*@param byteCount is the number of bytes to get permissions to read.
+ *@return true if the wait took place as planned, or false if the system is being shut down.
*/
- public void obtainReadPermission(long currentTime, int byteCount)
+ public boolean obtainReadPermission(long currentTime, int byteCount)
throws InterruptedException;
/** Note the completion of the read of a block of bytes. Call this after
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java Sat Dec 14 09:12:51 2013
@@ -86,14 +86,21 @@ public class ConnectionThrottler impleme
*@param throttleGroupType is the throttle group type.
*@param throttleGroup is the throttle group.
*@param binNames is the set of bin names to throttle for, within the throttle group.
- *@return the fetch throttler to use when performing fetches from the corresponding connection.
+ *@return the fetch throttler to use when performing fetches from the corresponding connection, or null if the system is being shut down.
*/
@Override
public IFetchThrottler obtainConnectionPermission(String throttleGroupType , String throttleGroup,
String[] binNames)
throws ManifoldCFException
{
- return throttler.obtainConnectionPermission(threadContext, throttleGroupType, throttleGroup, binNames);
+ try
+ {
+ return throttler.obtainConnectionPermission(throttleGroupType, throttleGroup, binNames);
+ }
+ catch (InterruptedException e)
+ {
+ throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
+ }
}
/** Determine whether to release a pooled connection. This method returns the number of bins
@@ -110,7 +117,7 @@ public class ConnectionThrottler impleme
public int overConnectionQuotaCount(String throttleGroupType, String throttleGroup, String[] binNames)
throws ManifoldCFException
{
- return throttler.overConnectionQuotaCount(threadContext, throttleGroupType, throttleGroup, binNames);
+ return throttler.overConnectionQuotaCount(throttleGroupType, throttleGroup, binNames);
}
/** Release permission to use one connection. This presumes that obtainConnectionPermission()
@@ -123,7 +130,7 @@ public class ConnectionThrottler impleme
public void releaseConnectionPermission(String throttleGroupType, String throttleGroup, String[] binNames)
throws ManifoldCFException
{
- throttler.releaseConnectionPermission(threadContext, throttleGroupType, throttleGroup, binNames);
+ throttler.releaseConnectionPermission(throttleGroupType, throttleGroup, binNames);
}
/** Poll periodically, to update cluster-wide statistics and allocation.
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Sat Dec 14 09:12:51 2013
@@ -81,8 +81,8 @@ public class ThrottleBin
protected long seriesStartTime = -1L;
/** Total actual bytes read in this series; this includes fetches in progress */
protected long totalBytesRead = -1L;
- /** The minimum milliseconds per byte per server */
- protected double minimumMillisecondsPerBytePerServer = Double.MAX_VALUE;
+ /** The minimum milliseconds per byte */
+ protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
/** Constructor. */
public ThrottleBin(String binName)
@@ -97,9 +97,9 @@ public class ThrottleBin
}
/** Update minimumMillisecondsPerBytePerServer */
- public void updateMinimumMillisecondsPerBytePerServer(double min)
+ public void updateMinimumMillisecondsPerByte(double min)
{
- this.minimumMillisecondsPerBytePerServer = min;
+ this.minimumMillisecondsPerByte = min;
}
/** Note the start of a fetch operation for a bin. Call this method just before the actual stream access begins.
@@ -172,7 +172,7 @@ public class ThrottleBin
long estimatedTime = (long)(rateEstimate * (double)byteCount);
// Figure out how long the total byte count should take, to meet the constraint
- long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
+ long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerByte);
// The wait time is the different between our desired end time, minus the estimated time to read the data, and the
// current time. But it can't be negative.
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java Sat Dec 14 09:12:51 2013
@@ -52,6 +52,17 @@ public class Throttler
{
}
+ // There are a lot of synchronizers to coordinate here. They are indeed hierarchical. It is not possible to simply
+ // throw a synchronizer at every level, and require that we hold all of them, because when we wait somewhere in the
+ // inner level, we will continue to hold locks and block access to all the outer levels.
+ //
+ // Instead, I've opted for a model whereby individual resources are protected. This is tricky to coordinate, though,
+ // because (for instance) after a resource has been removed from the hash table, it had better be cleaned up
+ // thoroughly before the outer lock is removed, or two versions of the resource might wind up coming into existence.
+ // The general rule is therefore:
+ // (1) Creation or deletion of resources involves locking the parent where the resource is being added or removed
+ // (2) Anything that waits CANNOT also add or remove.
+
/** Get all existing throttle groups for a throttle group type.
* The throttle group type typically describes a connector class, while the throttle group represents
* a namespace of bin names specific to that connector class.
@@ -74,15 +85,14 @@ public class Throttler
public void removeThrottleGroup(IThreadContext threadContext, String throttleGroupType, String throttleGroup)
throws ManifoldCFException
{
- ThrottlingGroups tg;
+ // Removal. Lock the whole hierarchy.
synchronized (throttleGroupsHash)
{
- tg = throttleGroupsHash.get(throttleGroupType);
- }
-
- if (tg != null)
- {
- tg.removeThrottleGroup(threadContext, throttleGroup);
+ ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+ if (tg != null)
+ {
+ tg.removeThrottleGroup(threadContext, throttleGroup);
+ }
}
}
@@ -95,18 +105,17 @@ public class Throttler
public void updateThrottleSpecification(IThreadContext threadContext, String throttleGroupType, String throttleGroup, IThrottleSpec throttleSpec)
throws ManifoldCFException
{
- ThrottlingGroups tg;
+ // Potential addition. Lock the whole hierarchy.
synchronized (throttleGroupsHash)
{
- tg = throttleGroupsHash.get(throttleGroupType);
+ ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
if (tg == null)
{
tg = new ThrottlingGroups(throttleGroupType);
throttleGroupsHash.put(throttleGroupType, tg);
}
+ tg.updateThrottleSpecification(threadContext, throttleGroup, throttleSpec);
}
-
- tg.updateThrottleSpecification(threadContext, throttleGroup, throttleSpec);
}
/** Get permission to use a connection, which is described by the passed array of bin names.
@@ -116,14 +125,26 @@ public class Throttler
*@param throttleGroupType is the throttle group type.
*@param throttleGroup is the throttle group.
*@param binNames is the set of bin names to throttle for, within the throttle group.
- *@return the fetch throttler to use when performing fetches from the corresponding connection.
+ *@return the fetch throttler to use when performing fetches from the corresponding connection, or null if the system is being shut down.
*/
- public IFetchThrottler obtainConnectionPermission(IThreadContext threadContext, String throttleGroupType , String throttleGroup,
+ public IFetchThrottler obtainConnectionPermission(String throttleGroupType , String throttleGroup,
String[] binNames)
- throws ManifoldCFException
+ throws InterruptedException
{
- // MHL
- return null;
+ // This method may wait at the innermost level. We therefore do not lock the hierarchy, BUT
+ // we must code for the possibility that the hierarchy is fluid, and may dissolve. Under such
+ // conditions we could simply retry. But in this case, the hierarchy has state: there is an
+ // explicit creation step (updateThrottleSpecification) that must be called to establish the
+ // innermost object and set it up correctly. It is therefore meaningful if we do not find
+ // the expected structure: we return null in that case.
+ ThrottlingGroups tg;
+ synchronized (throttleGroupsHash)
+ {
+ tg = throttleGroupsHash.get(throttleGroupType);
+ }
+ if (tg == null)
+ return null;
+ return tg.obtainConnectionPermission(throttleGroup, binNames);
}
/** Determine whether to release a pooled connection. This method returns the number of bins
@@ -136,11 +157,14 @@ public class Throttler
*@param binNames is the set of bin names to throttle for, within the throttle group.
*@return the number of bins that are over quota, or zero if none of them are.
*/
- public int overConnectionQuotaCount(IThreadContext threadContext, String throttleGroupType, String throttleGroup, String[] binNames)
- throws ManifoldCFException
+ public int overConnectionQuotaCount(String throttleGroupType, String throttleGroup, String[] binNames)
{
- // MHL
- return 0;
+ // No waiting, so it is OK to lock everything.
+ synchronized (throttleGroupsHash)
+ {
+ // MHL
+ return 0;
+ }
}
/** Release permission to use one connection. This presumes that obtainConnectionPermission()
@@ -149,10 +173,13 @@ public class Throttler
*@param throttleGroup is the throttle group.
*@param binNames is the set of bin names to throttle for, within the throttle group.
*/
- public void releaseConnectionPermission(IThreadContext threadContext, String throttleGroupType, String throttleGroup, String[] binNames)
- throws ManifoldCFException
+ public void releaseConnectionPermission(String throttleGroupType, String throttleGroup, String[] binNames)
{
- // MHL
+ // No waiting, so it is ok to lock the entire tree.
+ synchronized (throttleGroupsHash)
+ {
+ // MHL
+ }
}
/** Poll periodically.
@@ -160,15 +187,13 @@ public class Throttler
public void poll(IThreadContext threadContext, String throttleGroupType)
throws ManifoldCFException
{
- // Find the right pool, and poll that
- ThrottlingGroups tg;
+ // No waiting, so lock the entire tree.
synchronized (throttleGroupsHash)
{
- tg = throttleGroupsHash.get(throttleGroupType);
+ ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+ if (tg != null)
+ tg.poll(threadContext);
}
-
- if (tg != null)
- tg.poll(threadContext);
}
@@ -177,6 +202,7 @@ public class Throttler
public void freeUnusedResources(IThreadContext threadContext)
throws ManifoldCFException
{
+ // This potentially affects the entire hierarchy.
// Go through the whole pool and clean it out
synchronized (throttleGroupsHash)
{
@@ -194,6 +220,7 @@ public class Throttler
public void destroy(IThreadContext threadContext)
throws ManifoldCFException
{
+ // This affects the entire hierarchy, so lock the whole thing.
// Go through the whole pool and clean it out
synchronized (throttleGroupsHash)
{
@@ -249,8 +276,25 @@ public class Throttler
}
}
+ /** Obtain connection permission.
+ *@return null if the hierarchy has changed!
+ */
+ public IFetchThrottler obtainConnectionPermission(String throttleGroup, String[] binNames)
+ throws InterruptedException
+ {
+ // Can't lock the hierarchy here.
+ ThrottlingGroup g;
+ synchronized (groups)
+ {
+ g = groups.get(throttleGroup);
+ }
+ if (g == null)
+ return null;
+ return g.obtainConnectionPermission(binNames);
+ }
+
/** Remove specified throttle group */
- public synchronized void removeThrottleGroup(IThreadContext threadContext, String throttleGroup)
+ public void removeThrottleGroup(IThreadContext threadContext, String throttleGroup)
throws ManifoldCFException
{
// Must synch the whole thing, because otherwise there would be a risk of someone recreating the
@@ -265,7 +309,8 @@ public class Throttler
}
}
- /** Poll this set of throttle groups */
+ /** Poll this set of throttle groups.
+ */
public void poll(IThreadContext threadContext)
throws ManifoldCFException
{
@@ -360,7 +405,20 @@ public class Throttler
this.throttleSpec = throttleSpec;
}
- // MHL
+ /** Obtain connection permission.
+ *@return null if we are marked as 'not alive'.
+ */
+ public IFetchThrottler obtainConnectionPermission(String[] binNames)
+ throws InterruptedException
+ {
+ // First, make sure all the bins exist
+ // MHL
+ // Reserve a slot in all bins
+ // MHL
+ // Wait on each reserved bin in turn
+ // MHL
+ return null;
+ }
/** Call this periodically.
*/
@@ -391,7 +449,7 @@ public class Throttler
{
for (ThrottleBin bin : throttleBins.values())
{
- bin.updateMinimumMillisecondsPerBytePerServer(throttleSpec.getMinimumMillisecondsPerByte(bin.getBinName()));
+ bin.updateMinimumMillisecondsPerByte(throttleSpec.getMinimumMillisecondsPerByte(bin.getBinName()));
}
}