You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/18 15:20:36 UTC

svn commit: r1551949 - /manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java

Author: kwright
Date: Wed Dec 18 14:20:36 2013
New Revision: 1551949

URL: http://svn.apache.org/r1551949
Log:
Install byte-rate apportioning code.  Doesn't pass tests yet though.

Modified:
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1551949&r1=1551948&r2=1551949&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Wed Dec 18 14:20:36 2013
@@ -97,6 +97,11 @@ public class ThrottleBin
   protected long seriesStartTime = -1L;
   /** Total actual bytes read in this series; this includes fetches in progress */
   protected long totalBytesRead = -1L;
+
+  /** The time of the last poll */
+  protected long lastPollTime = -1L;
+  /** The number of bytes read since the last poll */
+  protected long lastPollBytes = 0L;
   
   /** The service type prefix for throttle bins */
   protected final static String serviceTypePrefix = "_THROTTLEBIN_";
@@ -198,6 +203,7 @@ public class ThrottleBin
           estimateInProgress = true;
           // Add these bytes to the estimated total
           totalBytesRead += (long)byteCount;
+          lastPollBytes += (long)byteCount;
           // Exit early; this thread isn't going to do any waiting
           return true;
         }
@@ -225,6 +231,7 @@ public class ThrottleBin
         {
           // Add these bytes to the estimated total
           totalBytesRead += (long)byteCount;
+          lastPollBytes += (long)byteCount;
           return true;
         }
         
@@ -286,13 +293,112 @@ public class ThrottleBin
   public synchronized void poll(IThreadContext threadContext)
     throws ManifoldCFException
   {
+    // Get the old time, new time, byte amounts
+    long oldTime = lastPollTime;
+    long newTime = System.currentTimeMillis();
+    long byteCount = lastPollBytes;
+    lastPollBytes = 0L;
+    
+    // Enter write lock
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     lockManager.enterWriteLock(targetCalcLockName);
     try
     {
-      // MHL
-      double target = minimumMillisecondsPerByte;
-      // MHL
+      // The cross-cluster apportionment of byte fetching goes here.
+      // For byte-rate throttling, the apportioning algorithm is simple.  First, it's done
+      // in bytes per millisecond, which is the inverse of what we actually use for the
+      // rest of this class.  Each service posts its current value for the maximum bytes
+      // per millisecond, and a target value for the same.
+      // The target value is computed as follows:
+      // (1) Target is summed cross-cluster, excluding our local service.  This is GlobalTarget.
+      // (2) In-use is summed cross-cluster, excluding our local service.  This is GlobalInUse.
+      // (3) MaximumTarget is computed, which is min(Maximum-GlobalTarget,Maximum-GlobalInUse).
+      // (4) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+      // (5) OptimalTarget is computed: We start with the current local target (in bytes per millisecond).
+      //    If the current local target exceeds current local in-use value, then OptimalTarget
+      //    is a point 1/2 the way between local target and local in-use value.  Otherwise, we add
+      //    1/4 of the local target value to the former local target value.
+      // (6) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget, and OptimalTarget.
+
+      // The tricky part of all this is computing the local in-use value.  Ideally, this would be
+      // the instantaneous value *right now*.  But we can approximate this by computing the
+      // number of bytes fetched since the last polling call, divided by the milliseconds elapsed
+      // since then.
+      
+      // Compute MaximumTarget
+      SumClass sumClass = new SumClass(serviceName);
+      lockManager.scanServiceData(serviceTypeName, sumClass);
+        
+      int numServices = sumClass.getNumServices();
+      if (numServices == 0)
+        return;
+      double globalTarget = sumClass.getGlobalTarget();
+      double globalInUse = sumClass.getGlobalInUse();
+      double globalMaxBytesPerMillisecond;
+      double maximumTarget;
+      double fairTarget;
+      if (minimumMillisecondsPerByte == 0.0)
+      {
+        globalMaxBytesPerMillisecond = Double.MAX_VALUE;
+        maximumTarget = globalMaxBytesPerMillisecond;
+        fairTarget = globalMaxBytesPerMillisecond;
+      }
+      else
+      {
+        globalMaxBytesPerMillisecond = 1.0 / minimumMillisecondsPerByte;
+
+        maximumTarget = globalMaxBytesPerMillisecond - globalTarget;
+        if (maximumTarget > globalMaxBytesPerMillisecond - globalInUse)
+          maximumTarget = globalMaxBytesPerMillisecond - globalInUse;
+        if (maximumTarget < 0.0)
+          maximumTarget = 0.0;
+
+        // Compute FairTarget
+        fairTarget = globalMaxBytesPerMillisecond / numServices;
+      }
+
+      // Compute localInUse
+      double localInUse;
+      if (oldTime == -1L || newTime == oldTime)
+      {
+        // No idea what our local in use value is; use zero for now
+        localInUse = 0.0;
+      }
+      else
+      {
+        localInUse = byteCount / (newTime - oldTime);
+      }
+      double optimalTarget;
+      if (localMinimum == 0.0)
+        optimalTarget = Double.MAX_VALUE;
+      else
+        optimalTarget = 1.0 / localMinimum;
+      if (optimalTarget > localInUse)
+        optimalTarget -= (optimalTarget-localInUse) / 4.0;
+      else
+        optimalTarget += optimalTarget / 4.0;
+
+      // Now compute actual target
+      double inverseTarget = maximumTarget;
+      if (inverseTarget > fairTarget)
+        inverseTarget = fairTarget;
+      if (inverseTarget > optimalTarget)
+        inverseTarget = optimalTarget;
+
+      // Write these values to the service data variables.
+      // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
+      // So, that's why we have a write lock around the pool calculations.
+        
+      lockManager.updateServiceData(serviceTypeName, serviceName, pack(inverseTarget, localInUse));
+
+      // Update our local minimum.
+      double target;
+      if (inverseTarget == 0.0)
+        target = Double.MAX_VALUE;
+      else
+        target = 1.0 / inverseTarget;
+      
+      // Reset local minimum, if it has changed.
       if (target == localMinimum)
         return;
       localMinimum = target;
@@ -315,5 +421,104 @@ public class ThrottleBin
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     lockManager.endServiceActivity(serviceTypeName, serviceName);
   }
+  
+  // Protected classes and methods
+  
+  protected static class SumClass implements IServiceDataAcceptor
+  {
+    protected final String serviceName;
+    protected int numServices = 0;
+    protected double globalTargetTally = 0;
+    protected double globalInUseTally = 0;
+    
+    public SumClass(String serviceName)
+    {
+      this.serviceName = serviceName;
+    }
+    
+    @Override
+    public boolean acceptServiceData(String serviceName, byte[] serviceData)
+      throws ManifoldCFException
+    {
+      numServices++;
+
+      if (!serviceName.equals(this.serviceName))
+      {
+        globalTargetTally += unpackTarget(serviceData);
+        globalInUseTally += unpackInUse(serviceData);
+      }
+      return false;
+    }
+
+    public int getNumServices()
+    {
+      return numServices;
+    }
+    
+    public double getGlobalTarget()
+    {
+      return globalTargetTally;
+    }
+    
+    public double getGlobalInUse()
+    {
+      return globalInUseTally;
+    }
+    
+  }
+  
+  protected static double unpackTarget(byte[] data)
+  {
+    if (data == null || data.length != 16)
+      return 0;
+    return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
+      ((((long)data[1]) << 8) & 0xff00L) +
+      ((((long)data[2]) << 16) & 0xff0000L) +
+      ((((long)data[3]) << 24) & 0xff000000L) +
+      ((((long)data[4]) << 32) & 0xff00000000L) +
+      ((((long)data[5]) << 40) & 0xff0000000000L) +
+      ((((long)data[6]) << 48) & 0xff000000000000L) +
+      ((((long)data[7]) << 56) & 0xff00000000000000L));
+  }
+
+  protected static double unpackInUse(byte[] data)
+  {
+    if (data == null || data.length != 16)
+      return 0;
+    return Double.longBitsToDouble((((long)data[8]) & 0xffL) +
+      ((((long)data[9]) << 8) & 0xff00L) +
+      ((((long)data[10]) << 16) & 0xff0000L) +
+      ((((long)data[11]) << 24) & 0xff000000L) +
+      ((((long)data[12]) << 32) & 0xff00000000L) +
+      ((((long)data[13]) << 40) & 0xff0000000000L) +
+      ((((long)data[14]) << 48) & 0xff000000000000L) +
+      ((((long)data[15]) << 56) & 0xff00000000000000L));
+  }
+
+  protected static byte[] pack(double targetDouble, double inUseDouble)
+  {
+    long target = Double.doubleToLongBits(targetDouble);
+    long inUse = Double.doubleToLongBits(inUseDouble);
+    byte[] rval = new byte[16];
+    rval[0] = (byte)(target & 0xffL);
+    rval[1] = (byte)((target >> 8) & 0xffL);
+    rval[2] = (byte)((target >> 16) & 0xffL);
+    rval[3] = (byte)((target >> 24) & 0xffL);
+    rval[4] = (byte)((target >> 32) & 0xffL);
+    rval[5] = (byte)((target >> 40) & 0xffL);
+    rval[6] = (byte)((target >> 48) & 0xffL);
+    rval[7] = (byte)((target >> 56) & 0xffL);
+    rval[8] = (byte)(inUse & 0xffL);
+    rval[9] = (byte)((inUse >> 8) & 0xffL);
+    rval[10] = (byte)((inUse >> 16) & 0xffL);
+    rval[11] = (byte)((inUse >> 24) & 0xffL);
+    rval[12] = (byte)((inUse >> 32) & 0xffL);
+    rval[13] = (byte)((inUse >> 40) & 0xffL);
+    rval[14] = (byte)((inUse >> 48) & 0xffL);
+    rval[15] = (byte)((inUse >> 56) & 0xffL);
+    return rval;
+  }
+
+
 }