You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/18 22:01:02 UTC

svn commit: r1552096 - in /manifoldcf/trunk: ./ framework/core/ framework/core/src/main/java/org/apache/manifoldcf/core/throttler/

Author: kwright
Date: Wed Dec 18 21:01:01 2013
New Revision: 1552096

URL: http://svn.apache.org/r1552096
Log:
Add cross-thread throttling support.  Fix for CONNECTORS-829.

Modified:
    manifoldcf/trunk/   (props changed)
    manifoldcf/trunk/framework/core/pom.xml
    manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
    manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
    manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java

Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
  Merged /manifoldcf/branches/CONNECTORS-829:r1551753-1552095

Modified: manifoldcf/trunk/framework/core/pom.xml
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/core/pom.xml?rev=1552096&r1=1552095&r2=1552096&view=diff
==============================================================================
--- manifoldcf/trunk/framework/core/pom.xml (original)
+++ manifoldcf/trunk/framework/core/pom.xml Wed Dec 18 21:01:01 2013
@@ -92,5 +92,23 @@
       <artifactId>zookeeper</artifactId>
       <version>${zookeeper.version}</version>
     </dependency>
+    <dependency>
+      <groupId>postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+      <version>${postgresql.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hsqldb</groupId>
+      <artifactId>hsqldb</artifactId>
+      <version>${hsqldb.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+      <version>${derby.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

Modified: manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java?rev=1552096&r1=1552095&r2=1552096&view=diff
==============================================================================
--- manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (original)
+++ manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java Wed Dec 18 21:01:01 2013
@@ -21,6 +21,7 @@ package org.apache.manifoldcf.core.throt
 import org.apache.manifoldcf.core.interfaces.*;
 import org.apache.manifoldcf.core.system.ManifoldCF;
 import java.util.concurrent.atomic.*;
+import java.util.*;
 
 /** Connection tracking for a bin.
 *
@@ -50,8 +51,14 @@ public class ConnectionBin
   protected final String serviceTypeName;
   /** The (anonymous) service name */
   protected final String serviceName;
+  /** The target calculation lock name */
+  protected final String targetCalcLockName;
+  
   /** This is the maximum number of active connections allowed for this bin */
   protected int maxActiveConnections = 0;
+  
+  /** This is the local maximum number of active connections allowed for this bin */
+  protected int localMax = 0;
   /** This is the number of connections in this bin that have been reserved - that is, they
   * are promised to various callers, but those callers have not yet committed to obtaining them. */
   protected int reservedConnections = 0;
@@ -62,22 +69,34 @@ public class ConnectionBin
   /** The service type prefix for connection bins */
   protected final static String serviceTypePrefix = "_CONNECTIONBIN_";
 
+  /** The target calculation lock prefix */
+  protected final static String targetCalcLockPrefix = "_CONNECTIONBINTARGET_";
+  
+  /** Random number */
+  protected final static Random randomNumberGenerator = new Random();
+
   /** Constructor. */
   public ConnectionBin(IThreadContext threadContext, String throttlingGroupName, String binName)
     throws ManifoldCFException
   {
     this.binName = binName;
     this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+    this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
     // Now, register and activate service anonymously, and record the service name we get.
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
   }
 
-  protected String buildServiceTypeName(String throttlingGroupName, String binName)
+  protected static String buildServiceTypeName(String throttlingGroupName, String binName)
   {
     return serviceTypePrefix + throttlingGroupName + "_" + binName;
   }
 
+  protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+  {
+    return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+  }
+  
   /** Get the bin name. */
   public String getBinName()
   {
@@ -88,9 +107,8 @@ public class ConnectionBin
   */
   public synchronized void updateMaxActiveConnections(int maxActiveConnections)
   {
-    // Update the number and wake up any waiting threads; they will take care of everything.
+    // Update the number; the poller will wake up any waiting threads.
     this.maxActiveConnections = maxActiveConnections;
-    notifyAll();
   }
 
   /** Wait for a connection to become available, in the context of an existing connection pool.
@@ -120,7 +138,7 @@ public class ConnectionBin
         poolCount.set(currentPoolCount - 1);
         return IConnectionThrottler.CONNECTION_FROM_POOL;
       }
-      if (inUseConnections + reservedConnections < maxActiveConnections)
+      if (inUseConnections + reservedConnections < localMax)
       {
         reservedConnections++;
         return IConnectionThrottler.CONNECTION_FROM_CREATION;
@@ -166,7 +184,7 @@ public class ConnectionBin
   public synchronized boolean shouldReturnedConnectionBeDestroyed()
   {
     // We don't count reserved connections here because those are not yet committed
-    return inUseConnections > maxActiveConnections;
+    return inUseConnections > localMax;
   }
   
   public static final int CONNECTION_DESTROY = 0;
@@ -189,7 +207,7 @@ public class ConnectionBin
       // return it, and no harm done.
       poolCount.set(currentPoolCount-1);
       // We don't count reserved connections here because those are not yet committed.
-      if (inUseConnections > maxActiveConnections)
+      if (inUseConnections > localMax)
       {
         return CONNECTION_DESTROY;
       }
@@ -216,6 +234,7 @@ public class ConnectionBin
   public synchronized void undoPooledConnectionDecision(AtomicInteger poolCount)
   {
     poolCount.set(poolCount.get() + 1);
+    notifyAll();
   }
   
   /** Note a connection returned to the pool.
@@ -239,7 +258,89 @@ public class ConnectionBin
   public synchronized void poll(IThreadContext threadContext)
     throws ManifoldCFException
   {
-    // MHL
+    // The meat of the cross-cluster apportionment algorithm goes here!
+    // Two global numbers each service posts: "in-use" and "target".  At no time does a service *ever* post either a "target"
+    // that, together with all other active service targets, is in excess of the max.  Also, at no time a service post
+    // a target that, when added to the other "in-use" values, exceeds the max.  If the "in-use" values everywhere else
+    // already equal or exceed the max, then the target will be zero.
+    // The target quota is calculated as follows:
+    // (1) Target is summed, excluding ours.  This is GlobalTarget.
+    // (2) In-use is summed, excluding ours.  This is GlobalInUse.
+    // (3) Our MaximumTarget is computed, which is Maximum - GlobalTarget or Maximum - GlobalInUse, whichever is
+    //     smaller, but never less than zero.
+    // (4) Our FairTarget is computed.  The FairTarget divides the Maximum by the number of services, and adds
+    //     1 randomly based on the remainder.
+    // (5) We compute OptimalTarget as follows: We start with current local target.  If current local target
+    //    exceeds current local in-use count, we adjust OptimalTarget downward by one.  Otherwise we increase it
+    //    by one.
+    // (6) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget, and OptimalTarget.
+
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.enterWriteLock(targetCalcLockName);
+    try
+    {
+      // Compute MaximumTarget
+      SumClass sumClass = new SumClass(serviceName);
+      lockManager.scanServiceData(serviceTypeName, sumClass);
+      //System.out.println("numServices = "+sumClass.getNumServices()+"; globalTarget = "+sumClass.getGlobalTarget()+"; globalInUse = "+sumClass.getGlobalInUse());
+        
+      int numServices = sumClass.getNumServices();
+      if (numServices == 0)
+        return;
+      int globalTarget = sumClass.getGlobalTarget();
+      int globalInUse = sumClass.getGlobalInUse();
+      int maximumTarget = maxActiveConnections - globalTarget;
+      if (maximumTarget > maxActiveConnections - globalInUse)
+        maximumTarget = maxActiveConnections - globalInUse;
+      if (maximumTarget < 0)
+        maximumTarget = 0;
+        
+      // Compute FairTarget
+      int fairTarget = maxActiveConnections / numServices;
+      int remainder = maxActiveConnections % numServices;
+      // Randomly choose whether we get an addition to the FairTarget
+      if (randomNumberGenerator.nextInt(numServices) < remainder)
+        fairTarget++;
+        
+      // Compute OptimalTarget
+      int localInUse = inUseConnections;
+      int optimalTarget = localMax;
+      if (localMax > localInUse)
+        optimalTarget--;
+      else
+      {
+        // We want a fast ramp up, so make this proportional to maxActiveConnections
+        int increment = maxActiveConnections >> 2;
+        if (increment == 0)
+          increment = 1;
+        optimalTarget += increment;
+      }
+        
+      //System.out.println("maxTarget = "+maximumTarget+"; fairTarget = "+fairTarget+"; optimalTarget = "+optimalTarget);
+
+      // Now compute actual target
+      int target = maximumTarget;
+      if (target > fairTarget)
+        target = fairTarget;
+      if (target > optimalTarget)
+        target = optimalTarget;
+        
+      // Write these values to the service data variables.
+      // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
+      // So, that's why we have a write lock around the pool calculations.
+        
+      lockManager.updateServiceData(serviceTypeName, serviceName, pack(target, localInUse));
+        
+      // Now, update our localMax, if it needs it.
+      if (target == localMax)
+        return;
+      localMax = target;
+      notifyAll();
+    }
+    finally
+    {
+      lockManager.leaveWriteLock(targetCalcLockName);
+    }
   }
 
   /** Shut down the bin, and release everything that is waiting on it.
@@ -252,5 +353,85 @@ public class ConnectionBin
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     lockManager.endServiceActivity(serviceTypeName, serviceName);
   }
+  
+  // Protected classes and methods
+  
+  protected static class SumClass implements IServiceDataAcceptor
+  {
+    protected final String serviceName;
+    protected int numServices = 0;
+    protected int globalTargetTally = 0;
+    protected int globalInUseTally = 0;
+    
+    public SumClass(String serviceName)
+    {
+      this.serviceName = serviceName;
+    }
+    
+    @Override
+    public boolean acceptServiceData(String serviceName, byte[] serviceData)
+      throws ManifoldCFException
+    {
+      numServices++;
+
+      if (!serviceName.equals(this.serviceName))
+      {
+        globalTargetTally += unpackTarget(serviceData);
+        globalInUseTally += unpackInUse(serviceData);
+      }
+      return false;
+    }
+
+    public int getNumServices()
+    {
+      return numServices;
+    }
+    
+    public int getGlobalTarget()
+    {
+      return globalTargetTally;
+    }
+    
+    public int getGlobalInUse()
+    {
+      return globalInUseTally;
+    }
+    
+  }
+  
+  protected static int unpackTarget(byte[] data)
+  {
+    if (data == null || data.length != 8)
+      return 0;
+    return (((int)data[0]) & 0xff) +
+      ((((int)data[1]) << 8) & 0xff00) +
+      ((((int)data[2]) << 16) & 0xff0000) +
+      ((((int)data[3]) << 24) & 0xff000000);
+  }
+
+  protected static int unpackInUse(byte[] data)
+  {
+    if (data == null || data.length != 8)
+      return 0;
+    return (((int)data[4]) & 0xff) +
+      ((((int)data[5]) << 8) & 0xff00) +
+      ((((int)data[6]) << 16) & 0xff0000) +
+      ((((int)data[7]) << 24) & 0xff000000);
+  }
+
+  protected static byte[] pack(int target, int inUse)
+  {
+    byte[] rval = new byte[8];
+    rval[0] = (byte)(target & 0xff);
+    rval[1] = (byte)((target >> 8) & 0xff);
+    rval[2] = (byte)((target >> 16) & 0xff);
+    rval[3] = (byte)((target >> 24) & 0xff);
+    rval[4] = (byte)(inUse & 0xff);
+    rval[5] = (byte)((inUse >> 8) & 0xff);
+    rval[6] = (byte)((inUse >> 16) & 0xff);
+    rval[7] = (byte)((inUse >> 24) & 0xff);
+    return rval;
+  }
+
 }
 

Modified: manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1552096&r1=1552095&r2=1552096&view=diff
==============================================================================
--- manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (original)
+++ manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Wed Dec 18 21:01:01 2013
@@ -38,32 +38,48 @@ public class FetchBin
   protected final String serviceTypeName;
   /** The (anonymous) service name */
   protected final String serviceName;
-  /** This is the last time a fetch was done on this bin */
-  protected long lastFetchTime = 0L;
+  /** The target calculation lock name */
+  protected final String targetCalcLockName;
+
   /** This is the minimum time between fetches for this bin, in ms. */
   protected long minTimeBetweenFetches = Long.MAX_VALUE;
+
+  /** The local minimum time between fetches */
+  protected long localMinimum = Long.MAX_VALUE;
+
+  /** This is the last time a fetch was done on this bin */
+  protected long lastFetchTime = 0L;
   /** Is the next fetch reserved? */
   protected boolean reserveNextFetch = false;
 
   /** The service type prefix for fetch bins */
   protected final static String serviceTypePrefix = "_FETCHBIN_";
 
+  /** The target calculation lock prefix */
+  protected final static String targetCalcLockPrefix = "_FETCHBINTARGET_";
+
   /** Constructor. */
   public FetchBin(IThreadContext threadContext, String throttlingGroupName, String binName)
     throws ManifoldCFException
   {
     this.binName = binName;
     this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+    this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
     // Now, register and activate service anonymously, and record the service name we get.
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
   }
 
-  protected String buildServiceTypeName(String throttlingGroupName, String binName)
+  protected static String buildServiceTypeName(String throttlingGroupName, String binName)
   {
     return serviceTypePrefix + throttlingGroupName + "_" + binName;
   }
 
+  protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+  {
+    return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+  }
+
   /** Get the bin name. */
   public String getBinName()
   {
@@ -76,8 +92,6 @@ public class FetchBin
   {
     // Update the number and wake up any waiting threads; they will take care of everything.
     this.minTimeBetweenFetches = minTimeBetweenFetches;
-    // Wake up everything that's waiting.
-    notifyAll();
   }
 
   /** Reserve a request to fetch a document from this bin.  The actual fetch is not yet committed
@@ -109,6 +123,7 @@ public class FetchBin
     if (!reserveNextFetch)
       throw new IllegalStateException("Can't clear a fetch reservation we don't have");
     reserveNextFetch = false;
+    notifyAll();
   }
   
   /** Wait the necessary time to do the fetch.  Presumes we've reserved the next fetch
@@ -126,7 +141,7 @@ public class FetchBin
       if (!isAlive)
         // Leave it to the caller to undo reservations
         return false;
-      if (minTimeBetweenFetches == Long.MAX_VALUE)
+      if (localMinimum == Long.MAX_VALUE)
       {
         // wait forever - but eventually someone will set a smaller interval and wake us up.
         wait();
@@ -135,7 +150,7 @@ public class FetchBin
       {
         long currentTime = System.currentTimeMillis();
         // Compute how long we have to wait, based on the current time and the time of the last fetch.
-        long waitAmt = lastFetchTime + minTimeBetweenFetches - currentTime;
+        long waitAmt = lastFetchTime + localMinimum - currentTime;
         if (waitAmt <= 0L)
         {
           // Note actual time we start the fetch.
@@ -153,7 +168,92 @@ public class FetchBin
   public synchronized void poll(IThreadContext threadContext)
     throws ManifoldCFException
   {
-    // MHL
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.enterWriteLock(targetCalcLockName);
+    try
+    {
+      // This is where the cross-cluster logic happens.
+      // Each service records the following information:
+      // -- the target rate, in fetches per millisecond
+      // -- the earliest possible time for the service's next fetch, in ms from start of epoch
+      // Target rates are apportioned in fetches-per-ms space, as follows:
+      // (1) Target rate is summed cross-cluster, excluding our local service.  This is GlobalTarget.
+      // (2) MaximumTarget is computed, which is Maximum-GlobalTarget.
+      // (3) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+      // (4) Finally, we compute Target rate by taking the minimum of MaximumTarget, FairTarget.
+      // The earliest time for the next fetch is computed as follows:
+      // (1) Find the LATEST most recent fetch time across the services, including an updated time for
+      //   the local service.
+      // (2) Compute the next possible fetch time, using the Target rate and that fetch time.
+      // (3) The new targeted fetch time will be set to that value.
+
+      SumClass sumClass = new SumClass(serviceName);
+      lockManager.scanServiceData(serviceTypeName, sumClass);
+
+      int numServices = sumClass.getNumServices();
+      if (numServices == 0)
+        return;
+      double globalTarget = sumClass.getGlobalTarget();
+      long earliestTargetTime = sumClass.getEarliestTime();
+      long currentTime = System.currentTimeMillis();
+      
+      if (lastFetchTime == 0L)
+        earliestTargetTime = currentTime;
+      else if (earliestTargetTime > lastFetchTime)
+        earliestTargetTime = lastFetchTime;
+      
+      // Now, compute the target rate
+      double globalMaxFetchesPerMillisecond;
+      double maximumTarget;
+      double fairTarget;
+      if (minTimeBetweenFetches == 0.0)
+      {
+        //System.out.println(binName+":Global minimum milliseconds per byte = 0.0");
+        globalMaxFetchesPerMillisecond = Double.MAX_VALUE;
+        maximumTarget = globalMaxFetchesPerMillisecond;
+        fairTarget = globalMaxFetchesPerMillisecond;
+      }
+      else
+      {
+        globalMaxFetchesPerMillisecond = 1.0 / minTimeBetweenFetches;
+        //System.out.println(binName+":Global max bytes per millisecond = "+globalMaxBytesPerMillisecond);
+        maximumTarget = globalMaxFetchesPerMillisecond - globalTarget;
+        if (maximumTarget < 0.0)
+          maximumTarget = 0.0;
+
+        // Compute FairTarget
+        fairTarget = globalMaxFetchesPerMillisecond / numServices;
+      }
+
+      // Now compute actual target
+      double inverseTarget = maximumTarget;
+      if (inverseTarget > fairTarget)
+        inverseTarget = fairTarget;
+
+      long target;
+      if (inverseTarget == 0.0)
+        target = Long.MAX_VALUE;
+      else
+        target = (long)(1.0/inverseTarget +0.5);
+      
+      long nextFetchTime = earliestTargetTime + target;
+      
+      lockManager.updateServiceData(serviceTypeName, serviceName, pack(inverseTarget, nextFetchTime));
+
+      // Update local parameters: the rate, and the next time.
+      // But in order to update the next time, we have to update the last time.
+      if (target == localMinimum && earliestTargetTime == lastFetchTime)
+        return;
+      //System.out.println(binName+":Setting localMinimum="+target+"; last fetch time="+earliestTargetTime);
+      localMinimum = target;
+      lastFetchTime = earliestTargetTime;
+      notifyAll();
+    }
+    finally
+    {
+      lockManager.leaveWriteLock(targetCalcLockName);
+    }
+
   }
 
   /** Shut the bin down, and wake up all threads waiting on it.
@@ -166,5 +266,103 @@ public class FetchBin
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     lockManager.endServiceActivity(serviceTypeName, serviceName);
   }
+  
+  // Protected classes and methods
+  
+  protected static class SumClass implements IServiceDataAcceptor
+  {
+    protected final String serviceName;
+    protected int numServices = 0;
+    protected double globalTargetTally = 0;
+    protected long earliestTime = Long.MAX_VALUE;
+    
+    public SumClass(String serviceName)
+    {
+      this.serviceName = serviceName;
+    }
+    
+    @Override
+    public boolean acceptServiceData(String serviceName, byte[] serviceData)
+      throws ManifoldCFException
+    {
+      numServices++;
+
+      if (!serviceName.equals(this.serviceName))
+      {
+        globalTargetTally += unpackTarget(serviceData);
+        long checkTime = unpackEarliestTime(serviceData);
+        if (checkTime < earliestTime)
+          earliestTime = checkTime;
+      }
+      return false;
+    }
+
+    public int getNumServices()
+    {
+      return numServices;
+    }
+    
+    public double getGlobalTarget()
+    {
+      return globalTargetTally;
+    }
+    
+    public long getEarliestTime()
+    {
+      return earliestTime;
+    }
+  }
+
+  protected static double unpackTarget(byte[] data)
+  {
+    if (data == null || data.length != 8)
+      return 0.0;
+    return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
+      ((((long)data[1]) << 8) & 0xff00L) +
+      ((((long)data[2]) << 16) & 0xff0000L) +
+      ((((long)data[3]) << 24) & 0xff000000L) +
+      ((((long)data[4]) << 32) & 0xff00000000L) +
+      ((((long)data[5]) << 40) & 0xff0000000000L) +
+      ((((long)data[6]) << 48) & 0xff000000000000L) +
+      ((((long)data[7]) << 56) & 0xff00000000000000L));
+  }
+
+  protected static long unpackEarliestTime(byte[] data)
+  {
+    if (data == null || data.length != 16)
+      return Long.MAX_VALUE;
+    return (((long)data[8]) & 0xffL) +
+      ((((long)data[9]) << 8) & 0xff00L) +
+      ((((long)data[10]) << 16) & 0xff0000L) +
+      ((((long)data[11]) << 24) & 0xff000000L) +
+      ((((long)data[12]) << 32) & 0xff00000000L) +
+      ((((long)data[13]) << 40) & 0xff0000000000L) +
+      ((((long)data[14]) << 48) & 0xff000000000000L) +
+      ((((long)data[15]) << 56) & 0xff00000000000000L);
+  }
+
+  protected static byte[] pack(double targetDouble, long earliestTime)
+  {
+    long target = Double.doubleToLongBits(targetDouble);
+    byte[] rval = new byte[16];
+    rval[0] = (byte)(target & 0xffL);
+    rval[1] = (byte)((target >> 8) & 0xffL);
+    rval[2] = (byte)((target >> 16) & 0xffL);
+    rval[3] = (byte)((target >> 24) & 0xffL);
+    rval[4] = (byte)((target >> 32) & 0xffL);
+    rval[5] = (byte)((target >> 40) & 0xffL);
+    rval[6] = (byte)((target >> 48) & 0xffL);
+    rval[7] = (byte)((target >> 56) & 0xffL);
+    rval[8] = (byte)(earliestTime & 0xffL);
+    rval[9] = (byte)((earliestTime >> 8) & 0xffL);
+    rval[10] = (byte)((earliestTime >> 16) & 0xffL);
+    rval[11] = (byte)((earliestTime >> 24) & 0xffL);
+    rval[12] = (byte)((earliestTime >> 32) & 0xffL);
+    rval[13] = (byte)((earliestTime >> 40) & 0xffL);
+    rval[14] = (byte)((earliestTime >> 48) & 0xffL);
+    rval[15] = (byte)((earliestTime >> 56) & 0xffL);
+    return rval;
+  }
+
 }
 

Modified: manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1552096&r1=1552095&r2=1552096&view=diff
==============================================================================
--- manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/trunk/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Wed Dec 18 21:01:01 2013
@@ -20,6 +20,7 @@ package org.apache.manifoldcf.core.throt
 
 import org.apache.manifoldcf.core.interfaces.*;
 import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
 
 /** Throttles for a bin.
 * An instance of this class keeps track of the information needed to bandwidth throttle access
@@ -75,6 +76,15 @@ public class ThrottleBin
   protected final String serviceTypeName;
   /** The (anonymous) service name */
   protected final String serviceName;
+  /** The target calculation lock name */
+  protected final String targetCalcLockName;
+
+  /** The minimum milliseconds per byte */
+  protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
+
+  /** The local minimum milliseconds per byte */
+  protected double localMinimum = Double.MAX_VALUE;
+  
   /** This is the reference count for this bin (which records active references) */
   protected volatile int refCount = 0;
   /** The inverse rate estimate of the first fetch, in ms/byte */
@@ -87,11 +97,12 @@ public class ThrottleBin
   protected long seriesStartTime = -1L;
   /** Total actual bytes read in this series; this includes fetches in progress */
   protected long totalBytesRead = -1L;
-  /** The minimum milliseconds per byte */
-  protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
-  
+
   /** The service type prefix for throttle bins */
   protected final static String serviceTypePrefix = "_THROTTLEBIN_";
+  
+  /** The target calculation lock prefix */
+  protected final static String targetCalcLockPrefix = "_THROTTLEBINTARGET_";
 
   /** Constructor. */
   public ThrottleBin(IThreadContext threadContext, String throttlingGroupName, String binName)
@@ -99,16 +110,22 @@ public class ThrottleBin
   {
     this.binName = binName;
     this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+    this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
     // Now, register and activate service anonymously, and record the service name we get.
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
   }
 
-  protected String buildServiceTypeName(String throttlingGroupName, String binName)
+  protected static String buildServiceTypeName(String throttlingGroupName, String binName)
   {
     return serviceTypePrefix + throttlingGroupName + "_" + binName;
   }
   
+  protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+  {
+    return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+  }
+
   /** Get the bin name. */
   public String getBinName()
   {
@@ -116,13 +133,9 @@ public class ThrottleBin
   }
 
   /** Update minimumMillisecondsPerBytePerServer */
-  public void updateMinimumMillisecondsPerByte(double min)
+  public synchronized void updateMinimumMillisecondsPerByte(double min)
   {
-    synchronized (this)
-    {
-      this.minimumMillisecondsPerByte = min;
-      notifyAll();
-    }
+    this.minimumMillisecondsPerByte = min;
   }
   
   /** Note the start of a fetch operation for a bin.  Call this method just before the actual stream access begins.
@@ -190,7 +203,7 @@ public class ThrottleBin
         }
 
         // If we haven't set a proper throttle yet, wait until we do.
-        if (minimumMillisecondsPerByte == Double.MAX_VALUE)
+        if (localMinimum == Double.MAX_VALUE)
         {
           wait();
           continue;
@@ -200,7 +213,7 @@ public class ThrottleBin
         long estimatedTime = (long)(rateEstimate * (double)byteCount);
 
         // Figure out how long the total byte count should take, to meet the constraint
-        long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * minimumMillisecondsPerByte);
+        long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * localMinimum);
 
 
         // The wait time is the difference between our desired end time, minus the estimated time to read the data, and the
@@ -273,7 +286,85 @@ public class ThrottleBin
   public synchronized void poll(IThreadContext threadContext)
     throws ManifoldCFException
   {
-    // MHL
+    
+    // Enter write lock
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.enterWriteLock(targetCalcLockName);
+    try
+    {
+      // The cross-cluster apportionment of byte fetching goes here.
+      // For byte-rate throttling, the apportioning algorithm is simple.  First, it's done
+      // in bytes per millisecond, which is the inverse of what we actually use for the
+      // rest of this class.  Each service posts its current value for the maximum bytes
+      // per millisecond, and a target value for the same.
+      // The target value is computed as follows:
+      // (1) Target is summed cross-cluster, excluding our local service.  This is GlobalTarget.
+      // (2) MaximumTarget is computed, which is Maximum-GlobalTarget.
+      // (3) FairTarget is computed, which is Maximum/numServices + rand(Maximum%numServices).
+      // (4) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget.
+
+      // Compute MaximumTarget
+      SumClass sumClass = new SumClass(serviceName);
+      lockManager.scanServiceData(serviceTypeName, sumClass);
+        
+      int numServices = sumClass.getNumServices();
+      if (numServices == 0)
+        return;
+      double globalTarget = sumClass.getGlobalTarget();
+      double globalMaxBytesPerMillisecond;
+      double maximumTarget;
+      double fairTarget;
+      if (minimumMillisecondsPerByte == 0.0)
+      {
+        //System.out.println(binName+":Global minimum milliseconds per byte = 0.0");
+        globalMaxBytesPerMillisecond = Double.MAX_VALUE;
+        maximumTarget = globalMaxBytesPerMillisecond;
+        fairTarget = globalMaxBytesPerMillisecond;
+      }
+      else
+      {
+        globalMaxBytesPerMillisecond = 1.0 / minimumMillisecondsPerByte;
+        //System.out.println(binName+":Global max bytes per millisecond = "+globalMaxBytesPerMillisecond);
+        maximumTarget = globalMaxBytesPerMillisecond - globalTarget;
+        if (maximumTarget < 0.0)
+          maximumTarget = 0.0;
+
+        // Compute FairTarget
+        fairTarget = globalMaxBytesPerMillisecond / numServices;
+      }
+
+      // Now compute actual target
+      double inverseTarget = maximumTarget;
+      if (inverseTarget > fairTarget)
+        inverseTarget = fairTarget;
+
+      //System.out.println(binName+":Inverse target = "+inverseTarget+"; maximumTarget = "+maximumTarget+"; fairTarget = "+fairTarget);
+      
+      // Write these values to the service data variables.
+      // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
+      // So, that's why we have a write lock around the pool calculations.
+        
+      lockManager.updateServiceData(serviceTypeName, serviceName, pack(inverseTarget));
+
+      // Update our local minimum.
+      double target;
+      if (inverseTarget == 0.0)
+        target = Double.MAX_VALUE;
+      else
+        target = 1.0 / inverseTarget;
+      
+      // Reset local minimum, if it has changed.
+      if (target == localMinimum)
+        return;
+      //System.out.println(binName+":Updating local minimum to "+target);
+      localMinimum = target;
+      notifyAll();
+    }
+    finally
+    {
+      lockManager.leaveWriteLock(targetCalcLockName);
+    }
+
   }
 
   /** Shut down this bin.
@@ -286,5 +377,74 @@ public class ThrottleBin
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     lockManager.endServiceActivity(serviceTypeName, serviceName);
   }
+  
+  // Protected classes and methods
+  
+  protected static class SumClass implements IServiceDataAcceptor
+  {
+    protected final String serviceName;
+    protected int numServices = 0;
+    protected double globalTargetTally = 0;
+    
+    public SumClass(String serviceName)
+    {
+      this.serviceName = serviceName;
+    }
+    
+    @Override
+    public boolean acceptServiceData(String serviceName, byte[] serviceData)
+      throws ManifoldCFException
+    {
+      numServices++;
+
+      if (!serviceName.equals(this.serviceName))
+      {
+        globalTargetTally += unpackTarget(serviceData);
+      }
+      return false;
+    }
+
+    public int getNumServices()
+    {
+      return numServices;
+    }
+    
+    public double getGlobalTarget()
+    {
+      return globalTargetTally;
+    }
+    
+  }
+  
+  protected static double unpackTarget(byte[] data)
+  {
+    if (data == null || data.length != 8)
+      return 0.0;
+    return Double.longBitsToDouble((((long)data[0]) & 0xffL) +
+      ((((long)data[1]) << 8) & 0xff00L) +
+      ((((long)data[2]) << 16) & 0xff0000L) +
+      ((((long)data[3]) << 24) & 0xff000000L) +
+      ((((long)data[4]) << 32) & 0xff00000000L) +
+      ((((long)data[5]) << 40) & 0xff0000000000L) +
+      ((((long)data[6]) << 48) & 0xff000000000000L) +
+      ((((long)data[7]) << 56) & 0xff00000000000000L));
+  }
+
+  protected static byte[] pack(double targetDouble)
+  {
+    long target = Double.doubleToLongBits(targetDouble);
+    byte[] rval = new byte[8];
+    rval[0] = (byte)(target & 0xffL);
+    rval[1] = (byte)((target >> 8) & 0xffL);
+    rval[2] = (byte)((target >> 16) & 0xffL);
+    rval[3] = (byte)((target >> 24) & 0xffL);
+    rval[4] = (byte)((target >> 32) & 0xffL);
+    rval[5] = (byte)((target >> 40) & 0xffL);
+    rval[6] = (byte)((target >> 48) & 0xffL);
+    rval[7] = (byte)((target >> 56) & 0xffL);
+    return rval;
+  }
+
+
 }