You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/15 07:32:33 UTC
svn commit: r1551002 - in
/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler:
ThrottleBin.java Throttler.java
Author: kwright
Date: Sun Dec 15 06:32:33 2013
New Revision: 1551002
URL: http://svn.apache.org/r1551002
Log:
Hook up throttle bins
Modified:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1551002&r1=1551001&r2=1551002&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Sun Dec 15 06:32:33 2013
@@ -142,7 +142,6 @@ public class ThrottleBin
public boolean beginRead(int byteCount)
throws InterruptedException
{
- long currentTime = System.currentTimeMillis();
synchronized (this)
{
@@ -155,6 +154,10 @@ public class ThrottleBin
wait();
continue;
}
+
+ // Update the current time
+ long currentTime = System.currentTimeMillis();
+
if (estimateValid == false)
{
seriesStartTime = currentTime;
@@ -171,6 +174,7 @@ public class ThrottleBin
// Figure out how long the total byte count should take, to meet the constraint
long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * minimumMillisecondsPerByte);
+
// The wait time is the different between our desired end time, minus the estimated time to read the data, and the
// current time. But it can't be negative.
long waitTime = (desiredEndTime - estimatedTime) - currentTime;
@@ -208,8 +212,6 @@ public class ThrottleBin
*/
public void endRead(int originalCount, int actualCount)
{
- long currentTime = System.currentTimeMillis();
-
synchronized (this)
{
totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
@@ -219,7 +221,7 @@ public class ThrottleBin
// Didn't actually get any bytes, so use 0.0
rateEstimate = 0.0;
else
- rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
+ rateEstimate = ((double)(System.currentTimeMillis() - seriesStartTime))/(double)actualCount;
estimateValid = true;
estimateInProgress = false;
notifyAll();
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java?rev=1551002&r1=1551001&r2=1551002&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java Sun Dec 15 06:32:33 2013
@@ -471,7 +471,7 @@ public class Throttler
}
}
// Reserve a slot
- if (!bin.reserveFetchRequest())
+ if (bin == null || !bin.reserveFetchRequest())
{
// Release previous reservations, and return null
while (i > 0)
@@ -552,8 +552,34 @@ public class Throttler
public boolean obtainReadPermission(String[] binNames, int byteCount)
throws InterruptedException
{
- // MHL
- return false;
+ int i = 0;
+ while (i < binNames.length)
+ {
+ String binName = binNames[i];
+ ThrottleBin bin;
+ synchronized (throttleBins)
+ {
+ bin = throttleBins.get(binName);
+ }
+ if (bin == null || bin.beginRead(byteCount))
+ {
+ // End bins we've already done, and exit
+ while (i > 0)
+ {
+ i--;
+ binName = binNames[i];
+ synchronized (throttleBins)
+ {
+ bin = throttleBins.get(binName);
+ }
+ if (bin != null)
+ bin.endRead(byteCount,0);
+ }
+ return false;
+ }
+ i++;
+ }
+ return true;
}
/** Note the completion of the read of a block of bytes. Call this after
@@ -563,7 +589,15 @@ public class Throttler
*/
public void releaseReadPermission(String[] binNames, int origByteCount, int actualByteCount)
{
- // MHL
+ synchronized (throttleBins)
+ {
+ for (String binName : binNames)
+ {
+ ThrottleBin bin = throttleBins.get(binName);
+ if (bin != null)
+ bin.endRead(origByteCount, actualByteCount);
+ }
+ }
}
/** Note the stream being closed.