You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/18 15:20:36 UTC
svn commit: r1551949 -
/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
Author: kwright
Date: Wed Dec 18 14:20:36 2013
New Revision: 1551949
URL: http://svn.apache.org/r1551949
Log:
Install byte-rate apportioning code. Doesn't pass tests yet though.
Modified:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1551949&r1=1551948&r2=1551949&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Wed Dec 18 14:20:36 2013
@@ -97,6 +97,11 @@ public class ThrottleBin
protected long seriesStartTime = -1L;
/** Total actual bytes read in this series; this includes fetches in progress */
protected long totalBytesRead = -1L;
+
+ /** The time of the last poll */
+ protected long lastPollTime = -1L;
+ /** The number of bytes read since the last poll */
+ protected long lastPollBytes = 0L;
/** The service type prefix for throttle bins */
protected final static String serviceTypePrefix = "_THROTTLEBIN_";
@@ -198,6 +203,7 @@ public class ThrottleBin
estimateInProgress = true;
// Add these bytes to the estimated total
totalBytesRead += (long)byteCount;
+ lastPollBytes += (long)byteCount;
// Exit early; this thread isn't going to do any waiting
return true;
}
@@ -225,6 +231,7 @@ public class ThrottleBin
{
// Add these bytes to the estimated total
totalBytesRead += (long)byteCount;
+ lastPollBytes += (long)byteCount;
return true;
}
@@ -286,13 +293,112 @@ public class ThrottleBin
public synchronized void poll(IThreadContext threadContext)
throws ManifoldCFException
{
+ // Get the old time, new time, byte amounts
+ long oldTime = lastPollTime;
+ long newTime = System.currentTimeMillis();
+ long byteCount = lastPollBytes;
+ lastPollBytes = 0L;
+
+ // Enter write lock
ILockManager lockManager = LockManagerFactory.make(threadContext);
lockManager.enterWriteLock(targetCalcLockName);
try
{
- // MHL
- double target = minimumMillisecondsPerByte;
- // MHL
+ // The cross-cluster apportionment of byte fetching goes here.
+ // For byte-rate throttling, the apportioning algorithm is simple. First, it's done
+ // in bytes per millisecond, which is the inverse of what we actually use for the
+ // rest of this class. Each service posts its current value for the maximum bytes
+ // per millisecond, and a target value for the same.
+ // The target value is computed as follows:
+ // (1) Target is summed cross-cluster, excluding our local service. This is GlobalTarget.
+ // (2) In-use is summed cross-cluster, excluding our local service. This is GlobalInUse.
+ // (3) MaximumTarget is computed, which is min(Maximum-GlobalTarget,Maximum-GlobalInUse).
+ // (4) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+ // (5) OptimalTarget is computed: We start with the current local target (in bytes per millisecond).
+ // If the current local target exceeds current local in-use value, then OptimalTarget
+ // is a point 1/2 the way between local target and local in-use value. Otherwise, we add
+ // 1/4 of the local target value to the former local target value.
+ // (6) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget, and OptimalTarget.
+
+ // The tricky part of all this is computing the local in-use value. Ideally, this would be
+ // the instantaneous value *right now*. But we can approximate this by computing the
+ // number of bytes fetched since the last polling call, divided by the milliseconds elapsed
+ // since then.
+
+ // Compute MaximumTarget
+ SumClass sumClass = new SumClass(serviceName);
+ lockManager.scanServiceData(serviceTypeName, sumClass);
+
+ int numServices = sumClass.getNumServices();
+ if (numServices == 0)
+ return;
+ double globalTarget = sumClass.getGlobalTarget();
+ double globalInUse = sumClass.getGlobalInUse();
+ double globalMaxBytesPerMillisecond;
+ double maximumTarget;
+ double fairTarget;
+ if (minimumMillisecondsPerByte == 0.0)
+ {
+ globalMaxBytesPerMillisecond = Double.MAX_VALUE;
+ maximumTarget = globalMaxBytesPerMillisecond;
+ fairTarget = globalMaxBytesPerMillisecond;
+ }
+ else
+ {
+ globalMaxBytesPerMillisecond = 1.0 / minimumMillisecondsPerByte;
+
+ maximumTarget = globalMaxBytesPerMillisecond - globalTarget;
+ if (maximumTarget > globalMaxBytesPerMillisecond - globalInUse)
+ maximumTarget = globalMaxBytesPerMillisecond - globalInUse;
+ if (maximumTarget < 0.0)
+ maximumTarget = 0.0;
+
+ // Compute FairTarget
+ fairTarget = globalMaxBytesPerMillisecond / numServices;
+ }
+
+ // Compute localInUse
+ double localInUse;
+ if (oldTime == -1L || newTime == oldTime)
+ {
+ // No idea what our local in use value is; use zero for now
+ localInUse = 0.0;
+ }
+ else
+ {
+ localInUse = byteCount / (newTime - oldTime);
+ }
+ double optimalTarget;
+ if (localMinimum == 0.0)
+ optimalTarget = Double.MAX_VALUE;
+ else
+ optimalTarget = 1.0 / localMinimum;
+ if (optimalTarget > localInUse)
+ optimalTarget -= (optimalTarget-localInUse) / 4.0;
+ else
+ optimalTarget += optimalTarget / 4.0;
+
+ // Now compute actual target
+ double inverseTarget = maximumTarget;
+ if (inverseTarget > fairTarget)
+ inverseTarget = fairTarget;
+ if (inverseTarget > optimalTarget)
+ inverseTarget = optimalTarget;
+
+ // Write these values to the service data variables.
+ // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
+ // So, that's why we have a write lock around the pool calculations.
+
+ lockManager.updateServiceData(serviceTypeName, serviceName, pack(inverseTarget, localInUse));
+
+ // Update our local minimum.
+ double target;
+ if (inverseTarget == 0.0)
+ target = Double.MAX_VALUE;
+ else
+ target = 1.0 / inverseTarget;
+
+ // Reset local minimum, if it has changed.
if (target == localMinimum)
return;
localMinimum = target;
@@ -315,5 +421,104 @@ public class ThrottleBin
ILockManager lockManager = LockManagerFactory.make(threadContext);
lockManager.endServiceActivity(serviceTypeName, serviceName);
}
+
+ // Protected classes and methods
+
+ protected static class SumClass implements IServiceDataAcceptor
+ {
+ protected final String serviceName;
+ protected int numServices = 0;
+ protected double globalTargetTally = 0;
+ protected double globalInUseTally = 0;
+
+ public SumClass(String serviceName)
+ {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public boolean acceptServiceData(String serviceName, byte[] serviceData)
+ throws ManifoldCFException
+ {
+ numServices++;
+
+ if (!serviceName.equals(this.serviceName))
+ {
+ globalTargetTally += unpackTarget(serviceData);
+ globalInUseTally += unpackInUse(serviceData);
+ }
+ return false;
+ }
+
+ public int getNumServices()
+ {
+ return numServices;
+ }
+
+ public double getGlobalTarget()
+ {
+ return globalTargetTally;
+ }
+
+ public double getGlobalInUse()
+ {
+ return globalInUseTally;
+ }
+
+ }
+
+ protected static double unpackTarget(byte[] data)
+ {
+ if (data == null || data.length != 16)
+ return 0;
+ return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
+ ((((long)data[1]) << 8) & 0xff00L) +
+ ((((long)data[2]) << 16) & 0xff0000L) +
+ ((((long)data[3]) << 24) & 0xff000000L) +
+ ((((long)data[4]) << 32) & 0xff00000000L) +
+ ((((long)data[5]) << 40) & 0xff0000000000L) +
+ ((((long)data[6]) << 48) & 0xff000000000000L) +
+ ((((long)data[7]) << 56) & 0xff00000000000000L));
+ }
+
+ protected static double unpackInUse(byte[] data)
+ {
+ if (data == null || data.length != 16)
+ return 0;
+ return Double.longBitsToDouble((((long)data[8]) & 0xffL) +
+ ((((long)data[9]) << 8) & 0xff00L) +
+ ((((long)data[10]) << 16) & 0xff0000L) +
+ ((((long)data[11]) << 24) & 0xff000000L) +
+ ((((long)data[12]) << 32) & 0xff00000000L) +
+ ((((long)data[13]) << 40) & 0xff0000000000L) +
+ ((((long)data[14]) << 48) & 0xff000000000000L) +
+ ((((long)data[15]) << 56) & 0xff00000000000000L));
+ }
+
+ protected static byte[] pack(double targetDouble, double inUseDouble)
+ {
+ long target = Double.doubleToLongBits(targetDouble);
+ long inUse = Double.doubleToLongBits(inUseDouble);
+ byte[] rval = new byte[16];
+ rval[0] = (byte)(target & 0xffL);
+ rval[1] = (byte)((target >> 8) & 0xffL);
+ rval[2] = (byte)((target >> 16) & 0xffL);
+ rval[3] = (byte)((target >> 24) & 0xffL);
+ rval[4] = (byte)((target >> 32) & 0xffL);
+ rval[5] = (byte)((target >> 40) & 0xffL);
+ rval[6] = (byte)((target >> 48) & 0xffL);
+ rval[7] = (byte)((target >> 56) & 0xffL);
+ rval[8] = (byte)(inUse & 0xffL);
+ rval[9] = (byte)((inUse >> 8) & 0xffL);
+ rval[10] = (byte)((inUse >> 16) & 0xffL);
+ rval[11] = (byte)((inUse >> 24) & 0xffL);
+ rval[12] = (byte)((inUse >> 32) & 0xffL);
+ rval[13] = (byte)((inUse >> 40) & 0xffL);
+ rval[14] = (byte)((inUse >> 48) & 0xffL);
+ rval[15] = (byte)((inUse >> 56) & 0xffL);
+ return rval;
+ }
+
+
}