You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/18 13:58:50 UTC
svn commit: r1551929 - in
/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler:
FetchBin.java ThrottleBin.java
Author: kwright
Date: Wed Dec 18 12:58:49 2013
New Revision: 1551929
URL: http://svn.apache.org/r1551929
Log:
Add supporting infrastructure for apportioning fetch rate and byte rate throttling cross-cluster.
Modified:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1551929&r1=1551928&r2=1551929&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Wed Dec 18 12:58:49 2013
@@ -38,32 +38,48 @@ public class FetchBin
protected final String serviceTypeName;
/** The (anonymous) service name */
protected final String serviceName;
- /** This is the last time a fetch was done on this bin */
- protected long lastFetchTime = 0L;
+ /** The target calculation lock name */
+ protected final String targetCalcLockName;
+
/** This is the minimum time between fetches for this bin, in ms. */
protected long minTimeBetweenFetches = Long.MAX_VALUE;
+
+ /** The local minimum time between fetches */
+ protected long localMinimum = Long.MAX_VALUE;
+
+ /** This is the last time a fetch was done on this bin */
+ protected long lastFetchTime = 0L;
/** Is the next fetch reserved? */
protected boolean reserveNextFetch = false;
/** The service type prefix for fetch bins */
protected final static String serviceTypePrefix = "_FETCHBIN_";
+ /** The target calculation lock prefix */
+ protected final static String targetCalcLockPrefix = "_FETCHBINTARGET_";
+
/** Constructor. */
public FetchBin(IThreadContext threadContext, String throttlingGroupName, String binName)
throws ManifoldCFException
{
this.binName = binName;
this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+ this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
// Now, register and activate service anonymously, and record the service name we get.
ILockManager lockManager = LockManagerFactory.make(threadContext);
this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
}
- protected String buildServiceTypeName(String throttlingGroupName, String binName)
+ protected static String buildServiceTypeName(String throttlingGroupName, String binName)
{
return serviceTypePrefix + throttlingGroupName + "_" + binName;
}
+ protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+ {
+ return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+ }
+
/** Get the bin name. */
public String getBinName()
{
@@ -76,8 +92,6 @@ public class FetchBin
{
// Update the number and wake up any waiting threads; they will take care of everything.
this.minTimeBetweenFetches = minTimeBetweenFetches;
- // Wake up everything that's waiting.
- notifyAll();
}
/** Reserve a request to fetch a document from this bin. The actual fetch is not yet committed
@@ -126,7 +140,7 @@ public class FetchBin
if (!isAlive)
// Leave it to the caller to undo reservations
return false;
- if (minTimeBetweenFetches == Long.MAX_VALUE)
+ if (localMinimum == Long.MAX_VALUE)
{
// wait forever - but eventually someone will set a smaller interval and wake us up.
wait();
@@ -135,7 +149,7 @@ public class FetchBin
{
long currentTime = System.currentTimeMillis();
// Compute how long we have to wait, based on the current time and the time of the last fetch.
- long waitAmt = lastFetchTime + minTimeBetweenFetches - currentTime;
+ long waitAmt = lastFetchTime + localMinimum - currentTime;
if (waitAmt <= 0L)
{
// Note actual time we start the fetch.
@@ -153,7 +167,23 @@ public class FetchBin
public synchronized void poll(IThreadContext threadContext)
throws ManifoldCFException
{
- // MHL
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.enterWriteLock(targetCalcLockName);
+ try
+ {
+ // MHL
+ long target = minTimeBetweenFetches;
+ // MHL
+ if (target == localMinimum)
+ return;
+ localMinimum = target;
+ notifyAll();
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(targetCalcLockName);
+ }
+
}
/** Shut the bin down, and wake up all threads waiting on it.
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1551929&r1=1551928&r2=1551929&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Wed Dec 18 12:58:49 2013
@@ -20,6 +20,7 @@ package org.apache.manifoldcf.core.throt
import org.apache.manifoldcf.core.interfaces.*;
import org.apache.manifoldcf.core.system.ManifoldCF;
+import java.util.*;
/** Throttles for a bin.
* An instance of this class keeps track of the information needed to bandwidth throttle access
@@ -75,6 +76,15 @@ public class ThrottleBin
protected final String serviceTypeName;
/** The (anonymous) service name */
protected final String serviceName;
+ /** The target calculation lock name */
+ protected final String targetCalcLockName;
+
+ /** The minimum milliseconds per byte */
+ protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
+
+ /** The local minimum milliseconds per byte */
+ protected double localMinimum = Double.MAX_VALUE;
+
/** This is the reference count for this bin (which records active references) */
protected volatile int refCount = 0;
/** The inverse rate estimate of the first fetch, in ms/byte */
@@ -87,11 +97,12 @@ public class ThrottleBin
protected long seriesStartTime = -1L;
/** Total actual bytes read in this series; this includes fetches in progress */
protected long totalBytesRead = -1L;
- /** The minimum milliseconds per byte */
- protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
/** The service type prefix for throttle bins */
protected final static String serviceTypePrefix = "_THROTTLEBIN_";
+
+ /** The target calculation lock prefix */
+ protected final static String targetCalcLockPrefix = "_THROTTLEBINTARGET_";
/** Constructor. */
public ThrottleBin(IThreadContext threadContext, String throttlingGroupName, String binName)
@@ -99,16 +110,22 @@ public class ThrottleBin
{
this.binName = binName;
this.serviceTypeName = buildServiceTypeName(throttlingGroupName, binName);
+ this.targetCalcLockName = buildTargetCalcLockName(throttlingGroupName, binName);
// Now, register and activate service anonymously, and record the service name we get.
ILockManager lockManager = LockManagerFactory.make(threadContext);
this.serviceName = lockManager.registerServiceBeginServiceActivity(serviceTypeName, null, null);
}
- protected String buildServiceTypeName(String throttlingGroupName, String binName)
+ protected static String buildServiceTypeName(String throttlingGroupName, String binName)
{
return serviceTypePrefix + throttlingGroupName + "_" + binName;
}
+ protected static String buildTargetCalcLockName(String throttlingGroupName, String binName)
+ {
+ return targetCalcLockPrefix + throttlingGroupName + "_" + binName;
+ }
+
/** Get the bin name. */
public String getBinName()
{
@@ -116,13 +133,9 @@ public class ThrottleBin
}
/** Update minimumMillisecondsPerBytePerServer */
- public void updateMinimumMillisecondsPerByte(double min)
+ public synchronized void updateMinimumMillisecondsPerByte(double min)
{
- synchronized (this)
- {
- this.minimumMillisecondsPerByte = min;
- notifyAll();
- }
+ this.minimumMillisecondsPerByte = min;
}
/** Note the start of a fetch operation for a bin. Call this method just before the actual stream access begins.
@@ -190,7 +203,7 @@ public class ThrottleBin
}
// If we haven't set a proper throttle yet, wait until we do.
- if (minimumMillisecondsPerByte == Double.MAX_VALUE)
+ if (localMinimum == Double.MAX_VALUE)
{
wait();
continue;
@@ -200,7 +213,7 @@ public class ThrottleBin
long estimatedTime = (long)(rateEstimate * (double)byteCount);
// Figure out how long the total byte count should take, to meet the constraint
- long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * minimumMillisecondsPerByte);
+ long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * localMinimum);
// The wait time is the difference between our desired end time, minus the estimated time to read the data, and the
@@ -273,7 +286,23 @@ public class ThrottleBin
public synchronized void poll(IThreadContext threadContext)
throws ManifoldCFException
{
- // MHL
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
+ lockManager.enterWriteLock(targetCalcLockName);
+ try
+ {
+ // MHL
+ double target = minimumMillisecondsPerByte;
+ // MHL
+ if (target == localMinimum)
+ return;
+ localMinimum = target;
+ notifyAll();
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(targetCalcLockName);
+ }
+
}
/** Shut down this bin.