You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/14 23:01:27 UTC

svn commit: r1550976 - in /manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler: FetchBin.java ThrottleBin.java

Author: kwright
Date: Sat Dec 14 22:01:27 2013
New Revision: 1550976

URL: http://svn.apache.org/r1550976
Log:
Add alive/not alive flag to all bins, so we can free stuck threads on shutdown

Modified:
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1550976&r1=1550975&r2=1550976&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Sat Dec 14 22:01:27 2013
@@ -30,6 +30,8 @@ import org.apache.manifoldcf.core.system
 */
 public class FetchBin
 {
+  /** This is set to true until the bin is shut down. */
+  protected boolean isAlive = true;
   /** This is the bin name which this connection pool belongs to */
   protected final String binName;
   /** This is the last time a fetch was done on this bin */
@@ -89,8 +91,9 @@ public class FetchBin
   
   /** Wait the necessary time to do the fetch.  Presumes we've reserved the next fetch
   * rights already, via reserveFetchRequest().
+  *@return false if the wait did not complete because the bin was shut down.
   */
-  public synchronized void waitNextFetch()
+  public synchronized boolean waitNextFetch()
     throws InterruptedException
   {
     if (!reserveNextFetch)
@@ -98,6 +101,8 @@ public class FetchBin
     
     while (true)
     {
+      if (!isAlive)
+        return false;
       if (minTimeBetweenFetches == Long.MAX_VALUE)
       {
         // wait forever - but eventually someone will set a smaller interval and wake us up.
@@ -109,7 +114,7 @@ public class FetchBin
         // Compute how long we have to wait, based on the current time and the time of the last fetch.
         long waitAmt = lastFetchTime + minTimeBetweenFetches - currentTime;
         if (waitAmt <= 0L)
-          break;
+          return true;
         wait(waitAmt);
       }
     }
@@ -125,5 +130,12 @@ public class FetchBin
     reserveNextFetch = false;
   }
 
+  /** Shut the bin down, and wake up all threads waiting on it.
+  */
+  public synchronized void shutDown()
+  {
+    isAlive = false;
+    notifyAll();
+  }
 }
 

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1550976&r1=1550975&r2=1550976&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Sat Dec 14 22:01:27 2013
@@ -67,6 +67,8 @@ import org.apache.manifoldcf.core.system
 */
 public class ThrottleBin
 {
+  /** This signals whether the bin is alive or not. */
+  protected boolean isAlive = true;
   /** This is the bin name which this throttle belongs to. */
   protected final String binName;
   /** This is the reference count for this bin (which records active references) */
@@ -136,60 +138,54 @@ public class ThrottleBin
     
   /** Note the start of an individual byte read of a specified size.  Call this method just before the
   * read request takes place.  Performs the necessary delay prior to reading specified number of bytes from the server.
+  *@return false if the wait was interrupted due to the bin being shut down.
   */
-  public void beginRead(int byteCount)
+  public boolean beginRead(int byteCount)
     throws InterruptedException
   {
     long currentTime = System.currentTimeMillis();
 
     synchronized (this)
     {
-      while (estimateInProgress)
-        wait();
-      if (estimateValid == false)
+      while (true)
       {
-        seriesStartTime = currentTime;
-        estimateInProgress = true;
-        // Add these bytes to the estimated total
-        totalBytesRead += (long)byteCount;
-        // Exit early; this thread isn't going to do any waiting
-        return;
-      }
-    }
-
-    // It is possible for the following code to get interrupted.  If that happens,
-    // we have to unstick the threads that are waiting on the estimate!
-    boolean finished = false;
-    try
-    {
-      long waitTime = 0L;
-      synchronized (this)
-      {
-        // Add these bytes to the estimated total
-        totalBytesRead += (long)byteCount;
+        if (!isAlive)
+          return false;
+        if (estimateInProgress)
+        {
+          wait();
+          continue;
+        }
+        if (estimateValid == false)
+        {
+          seriesStartTime = currentTime;
+          estimateInProgress = true;
+          // Add these bytes to the estimated total
+          totalBytesRead += (long)byteCount;
+          // Exit early; this thread isn't going to do any waiting
+          return true;
+        }
 
         // Estimate the time this read will take, and wait accordingly
         long estimatedTime = (long)(rateEstimate * (double)byteCount);
 
         // Figure out how long the total byte count should take, to meet the constraint
-        long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerByte);
+        long desiredEndTime = seriesStartTime + (long)(((double)(totalBytesRead + (long)byteCount)) * minimumMillisecondsPerByte);
 
         // The wait time is the different between our desired end time, minus the estimated time to read the data, and the
         // current time.  But it can't be negative.
-        waitTime = (desiredEndTime - estimatedTime) - currentTime;
-      }
+        long waitTime = (desiredEndTime - estimatedTime) - currentTime;
 
-      if (waitTime > 0L)
-      {
-        ManifoldCF.sleep(waitTime);
-      }
-      finished = true;
-    }
-    finally
-    {
-      if (!finished)
-      {
-        abortRead();
+        // If no wait is needed, go ahead and update what needs to be updated and exit.  Otherwise, do the wait.
+        if (waitTime <= 0L)
+        {
+          // Add these bytes to the estimated total
+          totalBytesRead += (long)byteCount;
+          return true;
+        }
+        
+        this.wait(waitTime);
+        // Back around again...
       }
     }
   }
@@ -244,5 +240,12 @@ public class ThrottleBin
 
   }
 
+  /** Shut down this bin.
+  */
+  public synchronized void shutDown()
+  {
+    isAlive = false;
+    notifyAll();
+  }
 }