You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/18 13:58:50 UTC

svn commit: r1551929 - in /manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler: FetchBin.java ThrottleBin.java

Author: kwright
Date: Wed Dec 18 12:58:49 2013
New Revision: 1551929

URL: http://svn.apache.org/r1551929
Log:
Add supporting infrastructure for apportioning fetch rate and byte rate throttling cross-cluster.

Modified:
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1551929&r1=1551928&r2=1551929&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Wed Dec 18 12:58:49 2013
@@ -38,32 +38,48 @@ public class FetchBin
   protected final String serviceTypeName;
   /** The (anonymous) service name */
   protected final String serviceName;
-  /** This is the last time a fetch was done on this bin */
-  protected long lastFetchTime = 0L;
+  /** The target calculation lock name */
+  protected final String targetCalcLockName;
+
   /** This is the minimum time between fetches for this bin, in ms. */
   protected long minTimeBetweenFetches = Long.MAX_VALUE;
+
+  /** The local minimum time between fetches */
+  protected long localMinimum = Long.MAX_VALUE;
+
+  /** This is the last time a fetch was done on this bin */
+  protected long lastFetchTime = 0L;
   /** Is the next fetch reserved? */
   protected boolean reserveNextFetch = false;
 
   /** The service type prefix for fetch bins */
   protected final static String serviceTypePrefix = "_FETCHBIN_";
 
+  /** The target calculation lock prefix */
+  protected final static String targetCalcLockPrefix = "_FETCHBINTARGET_";
+
   /** Constructor. */
   public FetchBin(IThreadContext threadContext, String throttlingGroupName, String binName)
     throws ManifoldCFException
   {
     this.binName = binName;
     this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+    this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
     // Now, register and activate service anonymously, and record the service name we get.
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
   }
 
-  protected String buildServiceTypeName(String throttlingGroupName, String binName)
+  protected static String buildServiceTypeName(String throttlingGroupName, String binName)
   {
     return serviceTypePrefix + throttlingGroupName + "_" + binName;
   }
 
+  protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+  {
+    return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+  }
+
   /** Get the bin name. */
   public String getBinName()
   {
@@ -76,8 +92,6 @@ public class FetchBin
   {
     // Update the number and wake up any waiting threads; they will take care of everything.
     this.minTimeBetweenFetches = minTimeBetweenFetches;
-    // Wake up everything that's waiting.
-    notifyAll();
   }
 
   /** Reserve a request to fetch a document from this bin.  The actual fetch is not yet committed
@@ -126,7 +140,7 @@ public class FetchBin
       if (!isAlive)
         // Leave it to the caller to undo reservations
         return false;
-      if (minTimeBetweenFetches == Long.MAX_VALUE)
+      if (localMinimum == Long.MAX_VALUE)
       {
         // wait forever - but eventually someone will set a smaller interval and wake us up.
         wait();
@@ -135,7 +149,7 @@ public class FetchBin
       {
         long currentTime = System.currentTimeMillis();
         // Compute how long we have to wait, based on the current time and the time of the last fetch.
-        long waitAmt = lastFetchTime + minTimeBetweenFetches - currentTime;
+        long waitAmt = lastFetchTime + localMinimum - currentTime;
         if (waitAmt <= 0L)
         {
           // Note actual time we start the fetch.
@@ -153,7 +167,23 @@ public class FetchBin
   public synchronized void poll(IThreadContext threadContext)
     throws ManifoldCFException
   {
-    // MHL
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.enterWriteLock(targetCalcLockName);
+    try
+    {
+      // MHL
+      long target = minTimeBetweenFetches;
+      // MHL
+      if (target == localMinimum)
+        return;
+      localMinimum = target;
+      notifyAll();
+    }
+    finally
+    {
+      lockManager.leaveWriteLock(targetCalcLockName);
+    }
+
   }
 
   /** Shut the bin down, and wake up all threads waiting on it.

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1551929&r1=1551928&r2=1551929&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Wed Dec 18 12:58:49 2013
@@ -20,6 +20,7 @@ package org.apache.manifoldcf.core.throt
 
 import org.apache.manifoldcf.core.interfaces.*;
 import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
 
 /** Throttles for a bin.
 * An instance of this class keeps track of the information needed to bandwidth throttle access
@@ -75,6 +76,15 @@ public class ThrottleBin
   protected final String serviceTypeName;
   /** The (anonymous) service name */
   protected final String serviceName;
+  /** The target calculation lock name */
+  protected final String targetCalcLockName;
+
+  /** The minimum milliseconds per byte */
+  protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
+
+  /** The local minimum milliseconds per byte */
+  protected double localMinimum = Double.MAX_VALUE;
+  
   /** This is the reference count for this bin (which records active references) */
   protected volatile int refCount = 0;
   /** The inverse rate estimate of the first fetch, in ms/byte */
@@ -87,11 +97,12 @@ public class ThrottleBin
   protected long seriesStartTime = -1L;
   /** Total actual bytes read in this series; this includes fetches in progress */
   protected long totalBytesRead = -1L;
-  /** The minimum milliseconds per byte */
-  protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
   
   /** The service type prefix for throttle bins */
   protected final static String serviceTypePrefix = "_THROTTLEBIN_";
+  
+  /** The target calculation lock prefix */
+  protected final static String targetCalcLockPrefix = "_THROTTLEBINTARGET_";
 
   /** Constructor. */
   public ThrottleBin(IThreadContext threadContext, String throttlingGroupName, String binName)
@@ -99,16 +110,22 @@ public class ThrottleBin
   {
     this.binName = binName;
     this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+    this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
     // Now, register and activate service anonymously, and record the service name we get.
     ILockManager lockManager = LockManagerFactory.make(threadContext);
     this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
   }
 
-  protected String buildServiceTypeName(String throttlingGroupName, String binName)
+  protected static String buildServiceTypeName(String throttlingGroupName, String binName)
   {
     return serviceTypePrefix + throttlingGroupName + "_" + binName;
   }
   
+  protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+  {
+    return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+  }
+
   /** Get the bin name. */
   public String getBinName()
   {
@@ -116,13 +133,9 @@ public class ThrottleBin
   }
 
   /** Update minimumMillisecondsPerBytePerServer */
-  public void updateMinimumMillisecondsPerByte(double min)
+  public synchronized void updateMinimumMillisecondsPerByte(double min)
   {
-    synchronized (this)
-    {
-      this.minimumMillisecondsPerByte = min;
-      notifyAll();
-    }
+    this.minimumMillisecondsPerByte = min;
   }
   
   /** Note the start of a fetch operation for a bin.  Call this method just before the actual stream access begins.
@@ -190,7 +203,7 @@ public class ThrottleBin
         }
 
         // If we haven't set a proper throttle yet, wait until we do.
-        if (minimumMillisecondsPerByte == Double.MAX_VALUE)
+        if (localMinimum == Double.MAX_VALUE)
         {
           wait();
           continue;
@@ -200,7 +213,7 @@ public class ThrottleBin
         long estimatedTime = (long)(rateEstimate * (double)byteCount);
 
         // Figure out how long the total byte count should take, to meet the constraint
-        long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * minimumMillisecondsPerByte);
+        long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * localMinimum);
 
 
         // The wait time is the difference between our desired end time, minus the estimated time to read the data, and the
@@ -273,7 +286,23 @@ public class ThrottleBin
   public synchronized void poll(IThreadContext threadContext)
     throws ManifoldCFException
   {
-    // MHL
+    ILockManager lockManager = LockManagerFactory.make(threadContext);
+    lockManager.enterWriteLock(targetCalcLockName);
+    try
+    {
+      // MHL
+      double target = minimumMillisecondsPerByte;
+      // MHL
+      if (target == localMinimum)
+        return;
+      localMinimum = target;
+      notifyAll();
+    }
+    finally
+    {
+      lockManager.leaveWriteLock(targetCalcLockName);
+    }
+
   }
 
   /** Shut down this bin.