You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/18 22:01:02 UTC
svn commit: r1552096 - in /manifoldcf/trunk: ./ framework/core/
framework/core/src/main/java/org/apache/manifoldcf/core/throttler/
Author: kwright
Date: Wed Dec 18 21:01:01 2013
New Revision: 1552096
URL: http://svn.apache.org/r1552096
Log:
Add cross-thread throttling support. Fix for CONNECTORS-829.
Modified:
manifoldcf/trunk/ (props changed)
manifoldcf/trunk/framework/core/pom.xml
manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
Merged /manifoldcf/branches/CONNECTORS-829:r1551753-1552095
Modified: manifoldcf/trunk/framework/core/pom.xml
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/core/pom.xml?rev=1552096&r1=1552095&r2=1552096&view=diff
==============================================================================
--- manifoldcf/trunk/framework/core/pom.xml (original)
+++ manifoldcf/trunk/framework/core/pom.xml Wed Dec 18 21:01:01 2013
@@ -92,5 +92,23 @@
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
+ <dependency>
+ <groupId>postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>${postgresql.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <version>${hsqldb.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>${derby.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
Modified: manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java?rev=1552096&r1=1552095&r2=1552096&view=diff
==============================================================================
--- manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (original)
+++ manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java Wed Dec 18 21:01:01 2013
@@ -21,6 +21,7 @@ package org.apache.manifoldcf.core.throt
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.core.system.ManifoldCF;
import java.util.concurrent.atomic.*;
+import java.util.*;
/** Connection tracking for a bin.
*
@@ -50,8 +51,14 @@ public class ConnectionBin
protected final String serviceTypeName;
/** The (anonymous) service name */
protected final String serviceName;
+ /** The target calculation lock name */
+ protected final String targetCalcLockName;
+
/** This is the maximum number of active connections allowed for this bin */
protected int maxActiveConnections = 0;
+
+ /** This is the local maximum number of active connections allowed for this bin */
+ protected int localMax = 0;
/** This is the number of connections in this bin that have been reserved - that is, they
* are promised to various callers, but those callers have not yet committed to obtaining them. */
protected int reservedConnections = 0;
@@ -62,22 +69,34 @@ public class ConnectionBin
/** The service type prefix for connection bins */
protected final static String serviceTypePrefix = "_CONNECTIONBIN_";
+ /** The target calculation lock prefix */
+ protected final static String targetCalcLockPrefix = "_CONNECTIONBINTARGET_";
+
+ /** Random number */
+ protected final static Random randomNumberGenerator = new Random();
+
/** Constructor. */
public ConnectionBin(IThreadContext threadContext, String throttlingGroupName, String binName)
throws ManifoldCFException
{
this.binName = binName;
this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+ this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
// Now, register and activate service anonymously, and record the service name we get.
ILockManager lockManager = LockManagerFactory.make(threadContext);
this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
}
- protected String buildServiceTypeName(String throttlingGroupName, String binName)
+ protected static String buildServiceTypeName(String throttlingGroupName, String binName)
{
return serviceTypePrefix + throttlingGroupName + "_" + binName;
}
+ protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+ {
+ return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+ }
+
/** Get the bin name. */
public String getBinName()
{
@@ -88,9 +107,8 @@ public class ConnectionBin
*/
public synchronized void updateMaxActiveConnections(int maxActiveConnections)
{
- // Update the number and wake up any waiting threads; they will take care of everything.
+ // Update the number; the poller will wake up any waiting threads.
this.maxActiveConnections = maxActiveConnections;
- notifyAll();
}
/** Wait for a connection to become available, in the context of an existing connection pool.
@@ -120,7 +138,7 @@ public class ConnectionBin
poolCount.set(currentPoolCount - 1);
return IConnectionThrottler.CONNECTION_FROM_POOL;
}
- if (inUseConnections + reservedConnections < maxActiveConnections)
+ if (inUseConnections + reservedConnections < localMax)
{
reservedConnections++;
return IConnectionThrottler.CONNECTION_FROM_CREATION;
@@ -166,7 +184,7 @@ public class ConnectionBin
public synchronized boolean shouldReturnedConnectionBeDestroyed()
{
// We don't count reserved connections here because those are not yet committed
- return inUseConnections > maxActiveConnections;
+ return inUseConnections > localMax;
}
public static final int CONNECTION_DESTROY = 0;
@@ -189,7 +207,7 @@ public class ConnectionBin
// return it, and no harm done.
poolCount.set(currentPoolCount-1);
// We don't count reserved connections here because those are not yet committed.
- if (inUseConnections > maxActiveConnections)
+ if (inUseConnections > localMax)
{
return CONNECTION_DESTROY;
}
@@ -216,6 +234,7 @@ public class ConnectionBin
public synchronized void undoPooledConnectionDecision(AtomicInteger poolCount)
{
poolCount.set(poolCount.get() + 1);
+ notifyAll();
}
/** Note a connection returned to the pool.
@@ -239,7 +258,89 @@ public class ConnectionBin
public synchronized void poll(IThreadContext threadContext)
throws ManifoldCFException
{
- // MHL
+ // The meat of the cross-cluster apportionment algorithm goes here!
+ // Two global numbers each service posts: "in-use" and "target". At no time does a service *ever* post either a "target"
+ // that, together with all other active service targets, is in excess of the max. Also, at no time a service post
+ // a target that, when added to the other "in-use" values, exceeds the max. If the "in-use" values everywhere else
+ // already equal or exceed the max, then the target will be zero.
+ // The target quota is calculated as follows:
+ // (1) Target is summed, excluding ours. This is GlobalTarget.
+ // (2) In-use is summed, excluding ours. This is GlobalInUse.
+ // (3) Our MaximumTarget is computed, which is Maximum - GlobalTarget or Maximum - GlobalInUse, whichever is
+ // smaller, but never less than zero.
+ // (4) Our FairTarget is computed. The FairTarget divides the Maximum by the number of services, and adds
+ // 1 randomly based on the remainder.
+ // (5) We compute OptimalTarget as follows: We start with current local target. If current local target
+ // exceeds current local in-use count, we adjust OptimalTarget downward by one. Otherwise we increase it
+ // by one.
+ // (6) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget, and OptimalTarget.
+
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.enterWriteLock(targetCalcLockName);
+ try
+ {
+ // Compute MaximumTarget
+ SumClass sumClass = new SumClass(serviceName);
+ lockManager.scanServiceData(serviceTypeName, sumClass);
+ //System.out.println("numServices = "+sumClass.getNumServices()+"; globalTarget = "+sumClass.getGlobalTarget()+"; globalInUse = "+sumClass.getGlobalInUse());
+
+ int numServices = sumClass.getNumServices();
+ if (numServices == 0)
+ return;
+ int globalTarget = sumClass.getGlobalTarget();
+ int globalInUse = sumClass.getGlobalInUse();
+ int maximumTarget = maxActiveConnections - globalTarget;
+ if (maximumTarget > maxActiveConnections - globalInUse)
+ maximumTarget = maxActiveConnections - globalInUse;
+ if (maximumTarget < 0)
+ maximumTarget = 0;
+
+ // Compute FairTarget
+ int fairTarget = maxActiveConnections / numServices;
+ int remainder = maxActiveConnections % numServices;
+ // Randomly choose whether we get an addition to the FairTarget
+ if (randomNumberGenerator.nextInt(numServices) < remainder)
+ fairTarget++;
+
+ // Compute OptimalTarget
+ int localInUse = inUseConnections;
+ int optimalTarget = localMax;
+ if (localMax > localInUse)
+ optimalTarget--;
+ else
+ {
+ // We want a fast ramp up, so make this proportional to maxActiveConnections
+ int increment = maxActiveConnections >> 2;
+ if (increment == 0)
+ increment = 1;
+ optimalTarget += increment;
+ }
+
+ //System.out.println("maxTarget = "+maximumTarget+"; fairTarget = "+fairTarget+"; optimalTarget = "+optimalTarget);
+
+ // Now compute actual target
+ int target = maximumTarget;
+ if (target > fairTarget)
+ target = fairTarget;
+ if (target > optimalTarget)
+ target = optimalTarget;
+
+ // Write these values to the service data variables.
+ // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
+ // So, that's why we have a write lock around the pool calculations.
+
+ lockManager.updateServiceData(serviceTypeName, serviceName, pack(target, localInUse));
+
+ // Now, update our localMax, if it needs it.
+ if (target == localMax)
+ return;
+ localMax = target;
+ notifyAll();
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(targetCalcLockName);
+ }
}
/** Shut down the bin, and release everything that is waiting on it.
@@ -252,5 +353,85 @@ public class ConnectionBin
ILockManager lockManager = LockManagerFactory.make(threadContext);
lockManager.endServiceActivity(serviceTypeName, serviceName);
}
+
+ // Protected classes and methods
+
+ protected static class SumClass implements IServiceDataAcceptor
+ {
+ protected final String serviceName;
+ protected int numServices = 0;
+ protected int globalTargetTally = 0;
+ protected int globalInUseTally = 0;
+
+ public SumClass(String serviceName)
+ {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public boolean acceptServiceData(String serviceName, byte[] serviceData)
+ throws ManifoldCFException
+ {
+ numServices++;
+
+ if (!serviceName.equals(this.serviceName))
+ {
+ globalTargetTally += unpackTarget(serviceData);
+ globalInUseTally += unpackInUse(serviceData);
+ }
+ return false;
+ }
+
+ public int getNumServices()
+ {
+ return numServices;
+ }
+
+ public int getGlobalTarget()
+ {
+ return globalTargetTally;
+ }
+
+ public int getGlobalInUse()
+ {
+ return globalInUseTally;
+ }
+
+ }
+
+ protected static int unpackTarget(byte[] data)
+ {
+ if (data == null || data.length != 8)
+ return 0;
+ return (((int)data[0]) & 0xff) +
+ ((((int)data[1]) << 8) & 0xff00) +
+ ((((int)data[2]) << 16) & 0xff0000) +
+ ((((int)data[3]) << 24) & 0xff000000);
+ }
+
+ protected static int unpackInUse(byte[] data)
+ {
+ if (data == null || data.length != 8)
+ return 0;
+ return (((int)data[4]) & 0xff) +
+ ((((int)data[5]) << 8) & 0xff00) +
+ ((((int)data[6]) << 16) & 0xff0000) +
+ ((((int)data[7]) << 24) & 0xff000000);
+ }
+
+ protected static byte[] pack(int target, int inUse)
+ {
+ byte[] rval = new byte[8];
+ rval[0] = (byte)(target & 0xff);
+ rval[1] = (byte)((target >> 8) & 0xff);
+ rval[2] = (byte)((target >> 16) & 0xff);
+ rval[3] = (byte)((target >> 24) & 0xff);
+ rval[4] = (byte)(inUse & 0xff);
+ rval[5] = (byte)((inUse >> 8) & 0xff);
+ rval[6] = (byte)((inUse >> 16) & 0xff);
+ rval[7] = (byte)((inUse >> 24) & 0xff);
+ return rval;
+ }
+
}
Modified: manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1552096&r1=1552095&r2=1552096&view=diff
==============================================================================
--- manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (original)
+++ manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Wed Dec 18 21:01:01 2013
@@ -38,32 +38,48 @@ public class FetchBin
protected final String serviceTypeName;
/** The (anonymous) service name */
protected final String serviceName;
- /** This is the last time a fetch was done on this bin */
- protected long lastFetchTime = 0L;
+ /** The target calculation lock name */
+ protected final String targetCalcLockName;
+
/** This is the minimum time between fetches for this bin, in ms. */
protected long minTimeBetweenFetches = Long.MAX_VALUE;
+
+ /** The local minimum time between fetches */
+ protected long localMinimum = Long.MAX_VALUE;
+
+ /** This is the last time a fetch was done on this bin */
+ protected long lastFetchTime = 0L;
/** Is the next fetch reserved? */
protected boolean reserveNextFetch = false;
/** The service type prefix for fetch bins */
protected final static String serviceTypePrefix = "_FETCHBIN_";
+ /** The target calculation lock prefix */
+ protected final static String targetCalcLockPrefix = "_FETCHBINTARGET_";
+
/** Constructor. */
public FetchBin(IThreadContext threadContext, String throttlingGroupName, String binName)
throws ManifoldCFException
{
this.binName = binName;
this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+ this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
// Now, register and activate service anonymously, and record the service name we get.
ILockManager lockManager = LockManagerFactory.make(threadContext);
this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
}
- protected String buildServiceTypeName(String throttlingGroupName, String binName)
+ protected static String buildServiceTypeName(String throttlingGroupName, String binName)
{
return serviceTypePrefix + throttlingGroupName + "_" + binName;
}
+ protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+ {
+ return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+ }
+
/** Get the bin name. */
public String getBinName()
{
@@ -76,8 +92,6 @@ public class FetchBin
{
// Update the number and wake up any waiting threads; they will take care of everything.
this.minTimeBetweenFetches = minTimeBetweenFetches;
- // Wake up everything that's waiting.
- notifyAll();
}
/** Reserve a request to fetch a document from this bin. The actual fetch is not yet committed
@@ -109,6 +123,7 @@ public class FetchBin
if (!reserveNextFetch)
throw new IllegalStateException("Can't clear a fetch reservation we don't have");
reserveNextFetch = false;
+ notifyAll();
}
/** Wait the necessary time to do the fetch. Presumes we've reserved the next fetch
@@ -126,7 +141,7 @@ public class FetchBin
if (!isAlive)
// Leave it to the caller to undo reservations
return false;
- if (minTimeBetweenFetches == Long.MAX_VALUE)
+ if (localMinimum == Long.MAX_VALUE)
{
// wait forever - but eventually someone will set a smaller interval and wake us up.
wait();
@@ -135,7 +150,7 @@ public class FetchBin
{
long currentTime = System.currentTimeMillis();
// Compute how long we have to wait, based on the current time and the time of the last fetch.
- long waitAmt = lastFetchTime + minTimeBetweenFetches - currentTime;
+ long waitAmt = lastFetchTime + localMinimum - currentTime;
if (waitAmt <= 0L)
{
// Note actual time we start the fetch.
@@ -153,7 +168,92 @@ public class FetchBin
public synchronized void poll(IThreadContext threadContext)
throws ManifoldCFException
{
- // MHL
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.enterWriteLock(targetCalcLockName);
+ try
+ {
+ // This is where the cross-cluster logic happens.
+ // Each service records the following information:
+ // -- the target rate, in fetches per millisecond
+ // -- the earliest possible time for the service's next fetch, in ms from start of epoch
+ // Target rates are apportioned in fetches-per-ms space, as follows:
+ // (1) Target rate is summed cross-cluster, excluding our local service. This is GlobalTarget.
+ // (2) MaximumTarget is computed, which is Maximum-GlobalTarget.
+ // (3) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+ // (4) Finally, we compute Target rate by taking the minimum of MaximumTarget, FairTarget.
+ // The earliest time for the next fetch is computed as follows:
+ // (1) Find the LATEST most recent fetch time across the services, including an updated time for
+ // the local service.
+ // (2) Compute the next possible fetch time, using the Target rate and that fetch time.
+ // (3) The new targeted fetch time will be set to that value.
+
+ SumClass sumClass = new SumClass(serviceName);
+ lockManager.scanServiceData(serviceTypeName, sumClass);
+
+ int numServices = sumClass.getNumServices();
+ if (numServices == 0)
+ return;
+ double globalTarget = sumClass.getGlobalTarget();
+ long earliestTargetTime = sumClass.getEarliestTime();
+ long currentTime = System.currentTimeMillis();
+
+ if (lastFetchTime == 0L)
+ earliestTargetTime = currentTime;
+ else if (earliestTargetTime > lastFetchTime)
+ earliestTargetTime = lastFetchTime;
+
+ // Now, compute the target rate
+ double globalMaxFetchesPerMillisecond;
+ double maximumTarget;
+ double fairTarget;
+ if (minTimeBetweenFetches == 0.0)
+ {
+ //System.out.println(binName+":Global minimum milliseconds per byte = 0.0");
+ globalMaxFetchesPerMillisecond = Double.MAX_VALUE;
+ maximumTarget = globalMaxFetchesPerMillisecond;
+ fairTarget = globalMaxFetchesPerMillisecond;
+ }
+ else
+ {
+ globalMaxFetchesPerMillisecond = 1.0 / minTimeBetweenFetches;
+ //System.out.println(binName+":Global max bytes per millisecond = "+globalMaxBytesPerMillisecond);
+ maximumTarget = globalMaxFetchesPerMillisecond - globalTarget;
+ if (maximumTarget < 0.0)
+ maximumTarget = 0.0;
+
+ // Compute FairTarget
+ fairTarget = globalMaxFetchesPerMillisecond / numServices;
+ }
+
+ // Now compute actual target
+ double inverseTarget = maximumTarget;
+ if (inverseTarget > fairTarget)
+ inverseTarget = fairTarget;
+
+ long target;
+ if (inverseTarget == 0.0)
+ target = Long.MAX_VALUE;
+ else
+ target = (long)(1.0/inverseTarget +0.5);
+
+ long nextFetchTime = earliestTargetTime + target;
+
+ lockManager.updateServiceData(serviceTypeName, serviceName, pack(inverseTarget, nextFetchTime));
+
+ // Update local parameters: the rate, and the next time.
+ // But in order to update the next time, we have to update the last time.
+ if (target == localMinimum && earliestTargetTime == lastFetchTime)
+ return;
+ //System.out.println(binName+":Setting localMinimum="+target+"; last fetch time="+earliestTargetTime);
+ localMinimum = target;
+ lastFetchTime = earliestTargetTime;
+ notifyAll();
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(targetCalcLockName);
+ }
+
}
/** Shut the bin down, and wake up all threads waiting on it.
@@ -166,5 +266,103 @@ public class FetchBin
ILockManager lockManager = LockManagerFactory.make(threadContext);
lockManager.endServiceActivity(serviceTypeName, serviceName);
}
+
+ // Protected classes and methods
+
+ protected static class SumClass implements IServiceDataAcceptor
+ {
+ protected final String serviceName;
+ protected int numServices = 0;
+ protected double globalTargetTally = 0;
+ protected long earliestTime = Long.MAX_VALUE;
+
+ public SumClass(String serviceName)
+ {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public boolean acceptServiceData(String serviceName, byte[] serviceData)
+ throws ManifoldCFException
+ {
+ numServices++;
+
+ if (!serviceName.equals(this.serviceName))
+ {
+ globalTargetTally += unpackTarget(serviceData);
+ long checkTime = unpackEarliestTime(serviceData);
+ if (checkTime < earliestTime)
+ earliestTime = checkTime;
+ }
+ return false;
+ }
+
+ public int getNumServices()
+ {
+ return numServices;
+ }
+
+ public double getGlobalTarget()
+ {
+ return globalTargetTally;
+ }
+
+ public long getEarliestTime()
+ {
+ return earliestTime;
+ }
+ }
+
+ protected static double unpackTarget(byte[] data)
+ {
+ if (data == null || data.length != 8)
+ return 0.0;
+ return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
+ ((((long)data[1]) << 8) & 0xff00L) +
+ ((((long)data[2]) << 16) & 0xff0000L) +
+ ((((long)data[3]) << 24) & 0xff000000L) +
+ ((((long)data[4]) << 32) & 0xff00000000L) +
+ ((((long)data[5]) << 40) & 0xff0000000000L) +
+ ((((long)data[6]) << 48) & 0xff000000000000L) +
+ ((((long)data[7]) << 56) & 0xff00000000000000L));
+ }
+
+ protected static long unpackEarliestTime(byte[] data)
+ {
+ if (data == null || data.length != 16)
+ return Long.MAX_VALUE;
+ return (((long)data[8]) & 0xffL) +
+ ((((long)data[9]) << 8) & 0xff00L) +
+ ((((long)data[10]) << 16) & 0xff0000L) +
+ ((((long)data[11]) << 24) & 0xff000000L) +
+ ((((long)data[12]) << 32) & 0xff00000000L) +
+ ((((long)data[13]) << 40) & 0xff0000000000L) +
+ ((((long)data[14]) << 48) & 0xff000000000000L) +
+ ((((long)data[15]) << 56) & 0xff00000000000000L);
+ }
+
+ protected static byte[] pack(double targetDouble, long earliestTime)
+ {
+ long target = Double.doubleToLongBits(targetDouble);
+ byte[] rval = new byte[16];
+ rval[0] = (byte)(target & 0xffL);
+ rval[1] = (byte)((target >> 8) & 0xffL);
+ rval[2] = (byte)((target >> 16) & 0xffL);
+ rval[3] = (byte)((target >> 24) & 0xffL);
+ rval[4] = (byte)((target >> 32) & 0xffL);
+ rval[5] = (byte)((target >> 40) & 0xffL);
+ rval[6] = (byte)((target >> 48) & 0xffL);
+ rval[7] = (byte)((target >> 56) & 0xffL);
+ rval[8] = (byte)(earliestTime & 0xffL);
+ rval[9] = (byte)((earliestTime >> 8) & 0xffL);
+ rval[10] = (byte)((earliestTime >> 16) & 0xffL);
+ rval[11] = (byte)((earliestTime >> 24) & 0xffL);
+ rval[12] = (byte)((earliestTime >> 32) & 0xffL);
+ rval[13] = (byte)((earliestTime >> 40) & 0xffL);
+ rval[14] = (byte)((earliestTime >> 48) & 0xffL);
+ rval[15] = (byte)((earliestTime >> 56) & 0xffL);
+ return rval;
+ }
+
}
Modified: manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1552096&r1=1552095&r2=1552096&view=diff
==============================================================================
--- manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Wed Dec 18 21:01:01 2013
@@ -20,6 +20,7 @@ package org.apache.manifoldcf.core.throt
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
/** Throttles for a bin.
* An instance of this class keeps track of the information needed to bandwidth throttle access
@@ -75,6 +76,15 @@ public class ThrottleBin
protected final String serviceTypeName;
/** The (anonymous) service name */
protected final String serviceName;
+ /** The target calculation lock name */
+ protected final String targetCalcLockName;
+
+ /** The minimum milliseconds per byte */
+ protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
+
+ /** The local minimum milliseconds per byte */
+ protected double localMinimum = Double.MAX_VALUE;
+
/** This is the reference count for this bin (which records active references) */
protected volatile int refCount = 0;
/** The inverse rate estimate of the first fetch, in ms/byte */
@@ -87,11 +97,12 @@ public class ThrottleBin
protected long seriesStartTime = -1L;
/** Total actual bytes read in this series; this includes fetches in progress */
protected long totalBytesRead = -1L;
- /** The minimum milliseconds per byte */
- protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
-
+
/** The service type prefix for throttle bins */
protected final static String serviceTypePrefix = "_THROTTLEBIN_";
+
+ /** The target calculation lock prefix */
+ protected final static String targetCalcLockPrefix = "_THROTTLEBINTARGET_";
/** Constructor. */
public ThrottleBin(IThreadContext threadContext, String throttlingGroupName, String binName)
@@ -99,16 +110,22 @@ public class ThrottleBin
{
this.binName = binName;
this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+ this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
// Now, register and activate service anonymously, and record the service name we get.
ILockManager lockManager = LockManagerFactory.make(threadContext);
this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
}
- protected String buildServiceTypeName(String throttlingGroupName, String binName)
+ protected static String buildServiceTypeName(String throttlingGroupName, String binName)
{
return serviceTypePrefix + throttlingGroupName + "_" + binName;
}
+ protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+ {
+ return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+ }
+
/** Get the bin name. */
public String getBinName()
{
@@ -116,13 +133,9 @@ public class ThrottleBin
}
/** Update minimumMillisecondsPerBytePerServer */
- public void updateMinimumMillisecondsPerByte(double min)
+ public synchronized void updateMinimumMillisecondsPerByte(double min)
{
- synchronized (this)
- {
- this.minimumMillisecondsPerByte = min;
- notifyAll();
- }
+ this.minimumMillisecondsPerByte = min;
}
/** Note the start of a fetch operation for a bin. Call this method just before the actual stream access begins.
@@ -190,7 +203,7 @@ public class ThrottleBin
}
// If we haven't set a proper throttle yet, wait until we do.
- if (minimumMillisecondsPerByte == Double.MAX_VALUE)
+ if (localMinimum == Double.MAX_VALUE)
{
wait();
continue;
@@ -200,7 +213,7 @@ public class ThrottleBin
long estimatedTime = (long)(rateEstimate * (double)byteCount);
// Figure out how long the total byte count should take, to meet the constraint
- long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * minimumMillisecondsPerByte);
+ long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * localMinimum);
// The wait time is the difference between our desired end time, minus the estimated time to read the data, and the
@@ -273,7 +286,85 @@ public class ThrottleBin
public synchronized void poll(IThreadContext threadContext)
throws ManifoldCFException
{
- // MHL
+
+ // Enter write lock
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.enterWriteLock(targetCalcLockName);
+ try
+ {
+ // The cross-cluster apportionment of byte fetching goes here.
+ // For byte-rate throttling, the apportioning algorithm is simple. First, it's done
+ // in bytes per millisecond, which is the inverse of what we actually use for the
+ // rest of this class. Each service posts its current value for the maximum bytes
+ // per millisecond, and a target value for the same.
+ // The target value is computed as follows:
+ // (1) Target is summed cross-cluster, excluding our local service. This is GlobalTarget.
+ // (2) MaximumTarget is computed, which is Maximum-GlobalTarget.
+ // (3) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+ // (4) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget.
+
+ // Compute MaximumTarget
+ SumClass sumClass = new SumClass(serviceName);
+ lockManager.scanServiceData(serviceTypeName, sumClass);
+
+ int numServices = sumClass.getNumServices();
+ if (numServices == 0)
+ return;
+ double globalTarget = sumClass.getGlobalTarget();
+ double globalMaxBytesPerMillisecond;
+ double maximumTarget;
+ double fairTarget;
+ if (minimumMillisecondsPerByte == 0.0)
+ {
+ //System.out.println(binName+":Global minimum milliseconds per byte = 0.0");
+ globalMaxBytesPerMillisecond = Double.MAX_VALUE;
+ maximumTarget = globalMaxBytesPerMillisecond;
+ fairTarget = globalMaxBytesPerMillisecond;
+ }
+ else
+ {
+ globalMaxBytesPerMillisecond = 1.0 / minimumMillisecondsPerByte;
+ //System.out.println(binName+":Global max bytes per millisecond = "+globalMaxBytesPerMillisecond);
+ maximumTarget = globalMaxBytesPerMillisecond - globalTarget;
+ if (maximumTarget < 0.0)
+ maximumTarget = 0.0;
+
+ // Compute FairTarget
+ fairTarget = globalMaxBytesPerMillisecond / numServices;
+ }
+
+ // Now compute actual target
+ double inverseTarget = maximumTarget;
+ if (inverseTarget > fairTarget)
+ inverseTarget = fairTarget;
+
+ //System.out.println(binName+":Inverse target = "+inverseTarget+"; maximumTarget = "+maximumTarget+"; fairTarget = "+fairTarget);
+
+ // Write these values to the service data variables.
+ // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
+ // So, that's why we have a write lock around the pool calculations.
+
+ lockManager.updateServiceData(serviceTypeName, serviceName, pack(inverseTarget));
+
+ // Update our local minimum.
+ double target;
+ if (inverseTarget == 0.0)
+ target = Double.MAX_VALUE;
+ else
+ target = 1.0 / inverseTarget;
+
+ // Reset local minimum, if it has changed.
+ if (target == localMinimum)
+ return;
+ //System.out.println(binName+":Updating local minimum to "+target);
+ localMinimum = target;
+ notifyAll();
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(targetCalcLockName);
+ }
+
}
/** Shut down this bin.
@@ -286,5 +377,74 @@ public class ThrottleBin
ILockManager lockManager = LockManagerFactory.make(threadContext);
lockManager.endServiceActivity(serviceTypeName, serviceName);
}
+
+ // Protected classes and methods
+
+ protected static class SumClass implements IServiceDataAcceptor
+ {
+ protected final String serviceName;
+ protected int numServices = 0;
+ protected double globalTargetTally = 0;
+
+ public SumClass(String serviceName)
+ {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public boolean acceptServiceData(String serviceName, byte[] serviceData)
+ throws ManifoldCFException
+ {
+ numServices++;
+
+ if (!serviceName.equals(this.serviceName))
+ {
+ globalTargetTally += unpackTarget(serviceData);
+ }
+ return false;
+ }
+
+ public int getNumServices()
+ {
+ return numServices;
+ }
+
+ public double getGlobalTarget()
+ {
+ return globalTargetTally;
+ }
+
+ }
+
+ protected static double unpackTarget(byte[] data)
+ {
+ if (data == null || data.length != 8)
+ return 0.0;
+ return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
+ ((((long)data[1]) << 8) & 0xff00L) +
+ ((((long)data[2]) << 16) & 0xff0000L) +
+ ((((long)data[3]) << 24) & 0xff000000L) +
+ ((((long)data[4]) << 32) & 0xff00000000L) +
+ ((((long)data[5]) << 40) & 0xff0000000000L) +
+ ((((long)data[6]) << 48) & 0xff000000000000L) +
+ ((((long)data[7]) << 56) & 0xff00000000000000L));
+ }
+
+ protected static byte[] pack(double targetDouble)
+ {
+ long target = Double.doubleToLongBits(targetDouble);
+ byte[] rval = new byte[8];
+ rval[0] = (byte)(target & 0xffL);
+ rval[1] = (byte)((target >> 8) & 0xffL);
+ rval[2] = (byte)((target >> 16) & 0xffL);
+ rval[3] = (byte)((target >> 24) & 0xffL);
+ rval[4] = (byte)((target >> 32) & 0xffL);
+ rval[5] = (byte)((target >> 40) & 0xffL);
+ rval[6] = (byte)((target >> 48) & 0xffL);
+ rval[7] = (byte)((target >> 56) & 0xffL);
+ return rval;
+ }
+
+
}