You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/18 06:00:59 UTC

svn commit: r1551842 - /manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java

Author: kwright
Date: Wed Dec 18 05:00:59 2013
New Revision: 1551842

URL: http://svn.apache.org/r1551842
Log:
Add cross-cluster connection pool limits

Modified:
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java?rev=1551842&r1=1551841&r2=1551842&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java Wed Dec 18 05:00:59 2013
@@ -21,6 +21,7 @@ package org.apache.manifoldcf.core.throt
 import org.apache.manifoldcf.core.interfaces.*;
 import org.apache.manifoldcf.core.system.ManifoldCF;
 import java.util.concurrent.atomic.*;
+import java.util.*;
 
 /** Connection tracking for a bin.
 *
@@ -50,8 +51,14 @@ public class ConnectionBin
   protected final String serviceTypeName;
   /** The (anonymous) service name */
   protected final String serviceName;
+  /** The target calculation lock name */
+  protected final String targetCalcLockName;
+  
   /** This is the maximum number of active connections allowed for this bin */
   protected int maxActiveConnections = 0;
+  
+  /** This is the local maximum number of active connections allowed for this bin */
+  protected int localMax = 0;
   /** This is the number of connections in this bin that have been reserved - that is, they
   * are promised to various callers, but those callers have not yet committed to obtaining them. */
   protected int reservedConnections = 0;
@@ -62,22 +69,34 @@ public class ConnectionBin
   /** The service type prefix for connection bins */
   protected final static String serviceTypePrefix = "_CONNECTIONBIN_";
 
+  /** The target calculation lock prefix */
+  protected final static String targetCalcLockPrefix = "_CONNECTIONBINTARGET_";
+  
+  /** Random number */
+  protected final static Random randomNumberGenerator = new Random();
+
   /** Constructor. */
   public ConnectionBin(IThreadContext threadContext, String throttlingGroupName, String binName)
     throws ManifoldCFException
   {
     this.binName = binName;
     this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+    this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
     // Now, register and activate service anonymously, and record the service name we get.
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
   }
 
-  protected String buildServiceTypeName(String throttlingGroupName, String binName)
+  protected static String buildServiceTypeName(String throttlingGroupName, String binName)
   {
     return serviceTypePrefix + throttlingGroupName + "_" + binName;
   }
 
+  protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+  {
+    return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+  }
+  
   /** Get the bin name. */
   public String getBinName()
   {
@@ -88,9 +107,8 @@ public class ConnectionBin
   */
   public synchronized void updateMaxActiveConnections(int maxActiveConnections)
   {
-    // Update the number and wake up any waiting threads; they will take care of everything.
+    // Update the number; the poller will wake up any waiting threads.
     this.maxActiveConnections = maxActiveConnections;
-    notifyAll();
   }
 
   /** Wait for a connection to become available, in the context of an existing connection pool.
@@ -120,7 +138,7 @@ public class ConnectionBin
         poolCount.set(currentPoolCount - 1);
         return IConnectionThrottler.CONNECTION_FROM_POOL;
       }
-      if (inUseConnections + reservedConnections < maxActiveConnections)
+      if (inUseConnections + reservedConnections < localMax)
       {
         reservedConnections++;
         return IConnectionThrottler.CONNECTION_FROM_CREATION;
@@ -166,7 +184,7 @@ public class ConnectionBin
   public synchronized boolean shouldReturnedConnectionBeDestroyed()
   {
     // We don't count reserved connections here because those are not yet committed
-    return inUseConnections > maxActiveConnections;
+    return inUseConnections > localMax;
   }
   
   public static final int CONNECTION_DESTROY = 0;
@@ -189,7 +207,7 @@ public class ConnectionBin
       // return it, and no harm done.
       poolCount.set(currentPoolCount-1);
       // We don't count reserved connections here because those are not yet committed.
-      if (inUseConnections > maxActiveConnections)
+      if (inUseConnections > localMax)
       {
         return CONNECTION_DESTROY;
       }
@@ -239,7 +257,89 @@ public class ConnectionBin
   public synchronized void poll(IThreadContext threadContext)
     throws ManifoldCFException
   {
-    // MHL
+    // The meat of the cross-cluster apportionment algorithm goes here!
+    // Two global numbers each service posts: "in-use" and "target".  At no time does a service *ever* post either a "target"
+    // that, together with all other active service targets, is in excess of the max.  Also, at no time a service post
+    // a target that, when added to the other "in-use" values, exceeds the max.  If the "in-use" values everywhere else
+    // already equal or exceed the max, then the target will be zero.
+    // The target quota is calculated as follows:
+    // (1) Target is summed, excluding ours.  This is GlobalTarget.
+    // (2) In-use is summed, excluding ours.  This is GlobalInUse.
+    // (3) Our MaximumTarget is computed, which is Maximum - GlobalTarget or Maximum - GlobalInUse, whichever is
+    //     smaller, but never less than zero.
+    // (4) Our FairTarget is computed.  The FairTarget divides the Maximum by the number of services, and adds
+    //     1 randomly based on the remainder.
+    // (5) We compute OptimalTarget as follows: We start with current local target.  If current local target
+    //    exceeds current local in-use count, we adjust OptimalTarget downward by one.  Otherwise we increase it
+    //    by one.
+    // (6) Finally, we compute Target by taking the minimum of MaximumTarget, FairTarget, and OptimalTarget.
+
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.enterWriteLock(targetCalcLockName);
+    try
+    {
+      // Compute MaximumTarget
+      SumClass sumClass = new SumClass(serviceName);
+      lockManager.scanServiceData(serviceTypeName, sumClass);
+      //System.out.println("numServices = "+sumClass.getNumServices()+"; globalTarget = "+sumClass.getGlobalTarget()+"; globalInUse = "+sumClass.getGlobalInUse());
+        
+      int numServices = sumClass.getNumServices();
+      if (numServices == 0)
+        return;
+      int globalTarget = sumClass.getGlobalTarget();
+      int globalInUse = sumClass.getGlobalInUse();
+      int maximumTarget = maxActiveConnections - globalTarget;
+      if (maximumTarget > maxActiveConnections - globalInUse)
+        maximumTarget = maxActiveConnections - globalInUse;
+      if (maximumTarget < 0)
+        maximumTarget = 0;
+        
+      // Compute FairTarget
+      int fairTarget = maxActiveConnections / numServices;
+      int remainder = maxActiveConnections % numServices;
+      // Randomly choose whether we get an addition to the FairTarget
+      if (randomNumberGenerator.nextInt(numServices) < remainder)
+        fairTarget++;
+        
+      // Compute OptimalTarget
+      int localInUse = inUseConnections;
+      int optimalTarget = localMax;
+      if (localMax > localInUse)
+        optimalTarget--;
+      else
+      {
+        // We want a fast ramp up, so make this proportional to maxActiveConnections
+        int increment = maxActiveConnections >> 2;
+        if (increment == 0)
+          increment = 1;
+        optimalTarget += increment;
+      }
+        
+      //System.out.println("maxTarget = "+maximumTarget+"; fairTarget = "+fairTarget+"; optimalTarget = "+optimalTarget);
+
+      // Now compute actual target
+      int target = maximumTarget;
+      if (target > fairTarget)
+        target = fairTarget;
+      if (target > optimalTarget)
+        target = optimalTarget;
+        
+      // Write these values to the service data variables.
+      // NOTE that there is a race condition here; the target value depends on all the calculations above being accurate, and not changing out from under us.
+      // So, that's why we have a write lock around the pool calculations.
+        
+      lockManager.updateServiceData(serviceTypeName, serviceName, pack(target, localInUse));
+        
+      // Now, update our localMax, if it needs it.
+      if (target == localMax)
+        return;
+      localMax = target;
+      notifyAll();
+    }
+    finally
+    {
+      lockManager.leaveWriteLock(targetCalcLockName);
+    }
   }
 
   /** Shut down the bin, and release everything that is waiting on it.
@@ -252,5 +352,85 @@ public class ConnectionBin
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     lockManager.endServiceActivity(serviceTypeName, serviceName);
   }
+  
+  // Protected classes and methods
+  
+  protected static class SumClass implements IServiceDataAcceptor
+  {
+    protected final String serviceName;
+    protected int numServices = 0;
+    protected int globalTargetTally = 0;
+    protected int globalInUseTally = 0;
+    
+    public SumClass(String serviceName)
+    {
+      this.serviceName = serviceName;
+    }
+    
+    @Override
+    public boolean acceptServiceData(String serviceName, byte[] serviceData)
+      throws ManifoldCFException
+    {
+      numServices++;
+
+      if (!serviceName.equals(this.serviceName))
+      {
+        globalTargetTally += unpackTarget(serviceData);
+        globalInUseTally += unpackInUse(serviceData);
+      }
+      return false;
+    }
+
+    public int getNumServices()
+    {
+      return numServices;
+    }
+    
+    public int getGlobalTarget()
+    {
+      return globalTargetTally;
+    }
+    
+    public int getGlobalInUse()
+    {
+      return globalInUseTally;
+    }
+    
+  }
+  
+  protected static int unpackTarget(byte[] data)
+  {
+    if (data == null || data.length != 8)
+      return 0;
+    return (((int)data[0]) & 0xff) +
+      ((((int)data[1]) << 8) & 0xff00) +
+      ((((int)data[2]) << 16) & 0xff0000) +
+      ((((int)data[3]) << 24) & 0xff000000);
+  }
+
+  protected static int unpackInUse(byte[] data)
+  {
+    if (data == null || data.length != 8)
+      return 0;
+    return (((int)data[4]) & 0xff) +
+      ((((int)data[5]) << 8) & 0xff00) +
+      ((((int)data[6]) << 16) & 0xff0000) +
+      ((((int)data[7]) << 24) & 0xff000000);
+  }
+
+  protected static byte[] pack(int target, int inUse)
+  {
+    byte[] rval = new byte[8];
+    rval[0] = (byte)(target & 0xff);
+    rval[1] = (byte)((target >> 8) & 0xff);
+    rval[2] = (byte)((target >> 16) & 0xff);
+    rval[3] = (byte)((target >> 24) & 0xff);
+    rval[4] = (byte)(inUse & 0xff);
+    rval[5] = (byte)((inUse >> 8) & 0xff);
+    rval[6] = (byte)((inUse >> 16) & 0xff);
+    rval[7] = (byte)((inUse >> 24) & 0xff);
+    return rval;
+  }
+
 }