You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/14 23:01:27 UTC
svn commit: r1550976 - in
/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler:
FetchBin.java ThrottleBin.java
Author: kwright
Date: Sat Dec 14 22:01:27 2013
New Revision: 1550976
URL: http://svn.apache.org/r1550976
Log:
Add alive/not alive flag to all bins, so we can free stuck threads on shutdown
Modified:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1550976&r1=1550975&r2=1550976&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Sat Dec 14 22:01:27 2013
@@ -30,6 +30,8 @@ import org.apache.manifoldcf.core.system
*/
public class FetchBin
{
+ /** This is set to true until the bin is shut down. */
+ protected boolean isAlive = true;
/** This is the bin name which this connection pool belongs to */
protected final String binName;
/** This is the last time a fetch was done on this bin */
@@ -89,8 +91,9 @@ public class FetchBin
/** Wait the necessary time to do the fetch. Presumes we've reserved the next fetch
* rights already, via reserveFetchRequest().
+ *@return false if the wait did not complete because the bin was shut down.
*/
- public synchronized void waitNextFetch()
+ public synchronized boolean waitNextFetch()
throws InterruptedException
{
if (!reserveNextFetch)
@@ -98,6 +101,8 @@ public class FetchBin
while (true)
{
+ if (!isAlive)
+ return false;
if (minTimeBetweenFetches == Long.MAX_VALUE)
{
// wait forever - but eventually someone will set a smaller interval and wake us up.
@@ -109,7 +114,7 @@ public class FetchBin
// Compute how long we have to wait, based on the current time and the time of the last fetch.
long waitAmt = lastFetchTime + minTimeBetweenFetches - currentTime;
if (waitAmt <= 0L)
- break;
+ return true;
wait(waitAmt);
}
}
@@ -125,5 +130,12 @@ public class FetchBin
reserveNextFetch = false;
}
+ /** Shut the bin down, and wake up all threads waiting on it.
+ */
+ public synchronized void shutDown()
+ {
+ isAlive = false;
+ notifyAll();
+ }
}
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1550976&r1=1550975&r2=1550976&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Sat Dec 14 22:01:27 2013
@@ -67,6 +67,8 @@ import org.apache.manifoldcf.core.system
*/
public class ThrottleBin
{
+ /** This signals whether the bin is alive or not. */
+ protected boolean isAlive = true;
/** This is the bin name which this throttle belongs to. */
protected final String binName;
/** This is the reference count for this bin (which records active references) */
@@ -136,60 +138,54 @@ public class ThrottleBin
/** Note the start of an individual byte read of a specified size. Call this method just before the
* read request takes place. Performs the necessary delay prior to reading specified number of bytes from the server.
+ *@return false if the wait was interrupted due to the bin being shut down.
*/
- public void beginRead(int byteCount)
+ public boolean beginRead(int byteCount)
throws InterruptedException
{
long currentTime = System.currentTimeMillis();
synchronized (this)
{
- while (estimateInProgress)
- wait();
- if (estimateValid == false)
+ while (true)
{
- seriesStartTime = currentTime;
- estimateInProgress = true;
- // Add these bytes to the estimated total
- totalBytesRead += (long)byteCount;
- // Exit early; this thread isn't going to do any waiting
- return;
- }
- }
-
- // It is possible for the following code to get interrupted. If that happens,
- // we have to unstick the threads that are waiting on the estimate!
- boolean finished = false;
- try
- {
- long waitTime = 0L;
- synchronized (this)
- {
- // Add these bytes to the estimated total
- totalBytesRead += (long)byteCount;
+ if (!isAlive)
+ return false;
+ if (estimateInProgress)
+ {
+ wait();
+ continue;
+ }
+ if (estimateValid == false)
+ {
+ seriesStartTime = currentTime;
+ estimateInProgress = true;
+ // Add these bytes to the estimated total
+ totalBytesRead += (long)byteCount;
+ // Exit early; this thread isn't going to do any waiting
+ return true;
+ }
// Estimate the time this read will take, and wait accordingly
long estimatedTime = (long)(rateEstimate * (double)byteCount);
// Figure out how long the total byte count should take, to meet the constraint
- long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerByte);
+ long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * minimumMillisecondsPerByte);
// The wait time is the different between our desired end time, minus the estimated time to read the data, and the
// current time. But it can't be negative.
- waitTime = (desiredEndTime - estimatedTime) - currentTime;
- }
+ long waitTime = (desiredEndTime - estimatedTime) - currentTime;
- if (waitTime > 0L)
- {
- ManifoldCF.sleep(waitTime);
- }
- finished = true;
- }
- finally
- {
- if (!finished)
- {
- abortRead();
+ // If no wait is needed, go ahead and update what needs to be updated and exit. Otherwise, do the wait.
+ if (waitTime <= 0L)
+ {
+ // Add these bytes to the estimated total
+ totalBytesRead += (long)byteCount;
+ return true;
+ }
+
+ this.wait(waitTime);
+ // Back around again...
}
}
}
@@ -244,5 +240,12 @@ public class ThrottleBin
}
+ /** Shut down this bin.
+ */
+ public synchronized void shutDown()
+ {
+ isAlive = false;
+ notifyAll();
+ }
}