You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/18 20:54:38 UTC

svn commit: r1552075 - in /manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler: FetchBin.java ThrottleBin.java

Author: kwright
Date: Wed Dec 18 19:54:38 2013
New Revision: 1552075

URL: http://svn.apache.org/r1552075
Log:
Add fetch bin throttling, which does not yet pass tests.

Modified:
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1552075&r1=1552074&r2=1552075&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Wed Dec 18 19:54:38 2013
@@ -171,12 +171,80 @@ public class FetchBin
     lockManager.enterWriteLock(targetCalcLockName);
     try
     {
-      // MHL
-      long target = minTimeBetweenFetches;
-      // MHL
-      if (target == localMinimum)
+      // This is where the cross-cluster logic happens.
+      // Each service records the following information:
+      // -- the target rate, in fetches per millisecond
+      // -- the earliest possible time for the service's next fetch, in ms from start of epoch
+      // Target rates are apportioned in fetches-per-ms space, as follows:
+      // (1) Target rate is summed cross-cluster, excluding our local service.  This is GlobalTarget.
+      // (2) MaximumTarget is computed, which is Maximum-GlobalTarget.
+      // (3) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+      // (4) Finally, we compute Target rate by taking the minimum of MaximumTarget, FairTarget.
+      // The earliest time for the next fetch is computed as follows:
+      // (1) Find the LATEST most recent fetch time across the services, including an updated time for
+      //   the local service.
+      // (2) Compute the next possible fetch time, using the Target rate and that fetch time.
+      // (3) The new targeted fetch time will be set to that value.
+
+      SumClass sumClass = new SumClass(serviceName);
+      lockManager.scanServiceData(serviceTypeName, sumClass);
+
+      int numServices = sumClass.getNumServices();
+      if (numServices == 0)
+        return;
+      double globalTarget = sumClass.getGlobalTarget();
+      long earliestTargetTime = sumClass.getEarliestTime();
+      long currentTime = System.currentTimeMillis();
+      
+      if (lastFetchTime == 0L)
+        earliestTargetTime = currentTime;
+      else if (earliestTargetTime > lastFetchTime)
+        earliestTargetTime = lastFetchTime;
+      
+      // Now, compute the target rate
+      double globalMaxFetchesPerMillisecond;
+      double maximumTarget;
+      double fairTarget;
+      if (minTimeBetweenFetches == 0.0)
+      {
+        //System.out.println(binName+":Global minimum milliseconds per byte = 0.0");
+        globalMaxFetchesPerMillisecond = Double.MAX_VALUE;
+        maximumTarget = globalMaxFetchesPerMillisecond;
+        fairTarget = globalMaxFetchesPerMillisecond;
+      }
+      else
+      {
+        globalMaxFetchesPerMillisecond = 1.0 / minTimeBetweenFetches;
+        //System.out.println(binName+":Global max bytes per millisecond = "+globalMaxBytesPerMillisecond);
+        maximumTarget = globalMaxFetchesPerMillisecond - globalTarget;
+        if (maximumTarget < 0.0)
+          maximumTarget = 0.0;
+
+        // Compute FairTarget
+        fairTarget = globalMaxFetchesPerMillisecond / numServices;
+      }
+
+      // Now compute actual target
+      double inverseTarget = maximumTarget;
+      if (inverseTarget > fairTarget)
+        inverseTarget = fairTarget;
+
+      long target;
+      if (inverseTarget == 0.0)
+        target = Long.MAX_VALUE;
+      else
+        target = (long)(1.0/inverseTarget +0.5);
+      
+      long nextFetchTime = earliestTargetTime + target;
+      
+      lockManager.updateServiceData(serviceTypeName, serviceName, pack(inverseTarget, nextFetchTime));
+
+      // Update local parameters: the rate, and the next time.
+      // But in order to update the next time, we have to update the last time.
+      if (target == localMinimum && earliestTargetTime == lastFetchTime)
         return;
       localMinimum = target;
+      lastFetchTime = earliestTargetTime;
       notifyAll();
     }
     finally
@@ -196,5 +264,103 @@ public class FetchBin
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     lockManager.endServiceActivity(serviceTypeName, serviceName);
   }
+  
+  // Protected classes and methods
+  
+  protected static class SumClass implements IServiceDataAcceptor
+  {
+    protected final String serviceName;
+    protected int numServices = 0;
+    protected double globalTargetTally = 0;
+    protected long earliestTime = Long.MAX_VALUE;
+    
+    public SumClass(String serviceName)
+    {
+      this.serviceName = serviceName;
+    }
+    
+    @Override
+    public boolean acceptServiceData(String serviceName, byte[] serviceData)
+      throws ManifoldCFException
+    {
+      numServices++;
+
+      if (!serviceName.equals(this.serviceName))
+      {
+        globalTargetTally += unpackTarget(serviceData);
+        long checkTime = unpackEarliestTime(serviceData);
+        if (checkTime < earliestTime)
+          earliestTime = checkTime;
+      }
+      return false;
+    }
+
+    public int getNumServices()
+    {
+      return numServices;
+    }
+    
+    public double getGlobalTarget()
+    {
+      return globalTargetTally;
+    }
+    
+    public long getEarliestTime()
+    {
+      return earliestTime;
+    }
+  }
+
+  protected static double unpackTarget(byte[] data)
+  {
+    if (data == null || data.length != 8)
+      return 0.0;
+    return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
+      ((((long)data[1]) << 8) & 0xff00L) +
+      ((((long)data[2]) << 16) & 0xff0000L) +
+      ((((long)data[3]) << 24) & 0xff000000L) +
+      ((((long)data[4]) << 32) & 0xff00000000L) +
+      ((((long)data[5]) << 40) & 0xff0000000000L) +
+      ((((long)data[6]) << 48) & 0xff000000000000L) +
+      ((((long)data[7]) << 56) & 0xff00000000000000L));
+  }
+
+  protected static long unpackEarliestTime(byte[] data)
+  {
+    if (data == null || data.length != 16)
+      return Long.MAX_VALUE;
+    return (((long)data[8]) & 0xffL) +
+      ((((long)data[9]) << 8) & 0xff00L) +
+      ((((long)data[10]) << 16) & 0xff0000L) +
+      ((((long)data[11]) << 24) & 0xff000000L) +
+      ((((long)data[12]) << 32) & 0xff00000000L) +
+      ((((long)data[13]) << 40) & 0xff0000000000L) +
+      ((((long)data[14]) << 48) & 0xff000000000000L) +
+      ((((long)data[15]) << 56) & 0xff00000000000000L);
+  }
+
+  protected static byte[] pack(double targetDouble, long earliestTime)
+  {
+    long target = Double.doubleToLongBits(targetDouble);
+    byte[] rval = new byte[16];
+    rval[0] = (byte)(target & 0xffL);
+    rval[1] = (byte)((target >> 8) & 0xffL);
+    rval[2] = (byte)((target >> 16) & 0xffL);
+    rval[3] = (byte)((target >> 24) & 0xffL);
+    rval[4] = (byte)((target >> 32) & 0xffL);
+    rval[5] = (byte)((target >> 40) & 0xffL);
+    rval[6] = (byte)((target >> 48) & 0xffL);
+    rval[7] = (byte)((target >> 56) & 0xffL);
+    rval[8] = (byte)(earliestTime & 0xffL);
+    rval[9] = (byte)((earliestTime >> 8) & 0xffL);
+    rval[10] = (byte)((earliestTime >> 16) & 0xffL);
+    rval[11] = (byte)((earliestTime >> 24) & 0xffL);
+    rval[12] = (byte)((earliestTime >> 32) & 0xffL);
+    rval[13] = (byte)((earliestTime >> 40) & 0xffL);
+    rval[14] = (byte)((earliestTime >> 48) & 0xffL);
+    rval[15] = (byte)((earliestTime >> 56) & 0xffL);
+    return rval;
+  }
+
 }
 

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1552075&r1=1552074&r2=1552075&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Wed Dec 18 19:54:38 2013
@@ -299,16 +299,10 @@ public class ThrottleBin
       // per millisecond, and a target value for the same.
       // The target value is computed as follows:
       // (1) Target is summed cross-cluster, excluding our local service.  This is GlobalTarget.
-      // (2) In-use is summed cross-cluster, excluding our local service.  This is GlobalInUse.
-      // (3) MaximumTarget is computed, which is min(Maximum-GlobalTarget,Maximum-GlobalInUse).
-      // (4) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
-      // (5) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget.
-
-      // The tricky part of all this is computing the local in-use value.  Ideally, this would be
-      // the instantaneous value *right now*.  But we can approximate this by computing the
-      // number of bytes fetched since the last polling call, divided by the milliseconds elapsed
-      // since then.
-      
+      // (2) MaximumTarget is computed, which is Maximum-GlobalTarget.
+      // (3) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+      // (4) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget.
+
       // Compute MaximumTarget
       SumClass sumClass = new SumClass(serviceName);
       lockManager.scanServiceData(serviceTypeName, sumClass);
@@ -425,7 +419,7 @@ public class ThrottleBin
   protected static double unpackTarget(byte[] data)
   {
     if (data == null || data.length != 8)
-      return 0;
+      return 0.0;
     return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
       ((((long)data[1]) << 8) & 0xff00L) +
       ((((long)data[2]) << 16) & 0xff0000L) +