You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/15 07:04:55 UTC

svn commit: r1551001 - in /manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core: interfaces/ throttler/

Author: kwright
Date: Sun Dec 15 06:04:55 2013
New Revision: 1551001

URL: http://svn.apache.org/r1551001
Log:
Hook up fetch bins

Modified:
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java?rev=1551001&r1=1551000&r2=1551001&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java Sun Dec 15 06:04:55 2013
@@ -31,18 +31,11 @@ public interface IFetchThrottler
   /** Get permission to fetch a document.  This grants permission to start
   * fetching a single document, within the connection that has already been
   * granted permission that created this object.  When done (or aborting), call
-  * releaseFetchDocumentPermission() to note the completion of the document
+  * IStreamThrottler.closeStream() to note the completion of the document
   * fetch activity.
-  *@param currentTime is the current time, in ms. since epoch.
   *@return the stream throttler to use to throttle the actual data access, or null if the system is being shut down.
   */
-  public IStreamThrottler obtainFetchDocumentPermission(long currentTime)
+  public IStreamThrottler obtainFetchDocumentPermission()
     throws InterruptedException;
   
-  /** Release permission to fetch a document.  Call this only when you
-  * called obtainFetchDocumentPermission() successfully earlier.
-  *@param currentTime is the current time, in ms. since epoch.
-  */
-  public void releaseFetchDocumentPermission(long currentTime);
-  
 }

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java?rev=1551001&r1=1551000&r2=1551001&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java Sun Dec 15 06:04:55 2013
@@ -31,19 +31,20 @@ public interface IStreamThrottler
   /** Obtain permission to read a block of bytes.  This method may wait until it is OK to proceed.
   * The throttle group, bin names, etc are already known
   * to this specific interface object, so it is unnecessary to include them here.
-  *@param currentTime is the current time, in ms. since epoch.
   *@param byteCount is the number of bytes to get permissions to read.
   *@return true if the wait took place as planned, or false if the system is being shut down.
   */
-  public boolean obtainReadPermission(long currentTime, int byteCount)
+  public boolean obtainReadPermission(int byteCount)
     throws InterruptedException;
     
   /** Note the completion of the read of a block of bytes.  Call this after
   * obtainReadPermission() was successfully called, and bytes were successfully read.
-  *@param currentTime is the current time, in ms. since epoch.
   *@param origByteCount is the originally requested number of bytes to get permissions to read.
   *@param actualByteCount is the number of bytes actually read.
   */
-  public void releaseReadPermission(long currentTime, int origByteCount, int actualByteCount);
+  public void releaseReadPermission(int origByteCount, int actualByteCount);
   
+  /** Note the stream being closed.
+  */
+  public void closeStream();
 }

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1551001&r1=1551000&r2=1551001&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Sun Dec 15 06:04:55 2013
@@ -66,18 +66,23 @@ public class FetchBin
   /** Reserve a request to fetch a document from this bin.  The actual fetch is not yet committed
   * with this call, but if it succeeds for all bins associated with the document, then the caller
   * has permission to do the fetch, and can update the last fetch time.
+  *@return false if the fetch bin is being shut down.
   */
-  public synchronized void reserveFetchRequest()
+  public synchronized boolean reserveFetchRequest()
     throws InterruptedException
   {
     // First wait for the ability to even get the next fetch from this bin
     while (true)
     {
+      if (!isAlive)
+        return false;
       if (!reserveNextFetch)
-        break;
+      {
+        reserveNextFetch = true;
+        return true;
+      }
       wait();
     }
-    reserveNextFetch = true;
   }
   
   /** Clear reserved request.
@@ -102,6 +107,7 @@ public class FetchBin
     while (true)
     {
       if (!isAlive)
+        // Leave it to the caller to undo reservations
         return false;
       if (minTimeBetweenFetches == Long.MAX_VALUE)
       {
@@ -114,22 +120,18 @@ public class FetchBin
         // Compute how long we have to wait, based on the current time and the time of the last fetch.
         long waitAmt = lastFetchTime + minTimeBetweenFetches - currentTime;
         if (waitAmt <= 0L)
+        {
+          // Note actual time we start the fetch.
+          if (currentTime > lastFetchTime)
+            lastFetchTime = currentTime;
+          reserveNextFetch = false;
           return true;
+        }
         wait(waitAmt);
       }
     }
   }
   
-  /** Note the beginning of fetch of a document from this bin.
-  *@param currentTime is the actual time the fetch was started.
-  */
-  public synchronized void beginFetch(long currentTime)
-  {
-    if (currentTime > lastFetchTime)
-      lastFetchTime = currentTime;
-    reserveNextFetch = false;
-  }
-
   /** Shut the bin down, and wake up all threads waiting on it.
   */
   public synchronized void shutDown()

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1551001&r1=1551000&r2=1551001&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Sun Dec 15 06:04:55 2013
@@ -108,7 +108,6 @@ public class ThrottleBin
   * May wait until schedule allows.
   */
   public void beginFetch()
-    throws InterruptedException
   {
     synchronized (this)
     {

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java?rev=1551001&r1=1551000&r2=1551001&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java Sun Dec 15 06:04:55 2013
@@ -431,15 +431,14 @@ public class Throttler
     /** Release connection */
     public void releaseConnectionPermission(String[] binNames)
     {
-      for (String binName : binNames)
+      synchronized (connectionBins)
       {
-        ConnectionBin bin;
-        synchronized (connectionBins)
+        for (String binName : binNames)
         {
-          bin = connectionBins.get(binName);
+          ConnectionBin bin = connectionBins.get(binName);
+          if (bin != null)
+            bin.noteConnectionDestruction();
         }
-        if (bin != null)
-          bin.noteConnectionDestruction();
       }
     }
     
@@ -453,32 +452,104 @@ public class Throttler
     *@param currentTime is the current time, in ms. since epoch.
     *@return the stream throttler to use to throttle the actual data access, or null if the system is being shut down.
     */
-    public IStreamThrottler obtainFetchDocumentPermission(String[] binNames, long currentTime)
+    public IStreamThrottler obtainFetchDocumentPermission(String[] binNames)
       throws InterruptedException
     {
-      // MHL
+      // First, make sure all the bins exist, and reserve a slot in each
+      int i = 0;
+      while (i < binNames.length)
+      {
+        String binName = binNames[i];
+        FetchBin bin;
+        synchronized (fetchBins)
+        {
+          bin = fetchBins.get(binName);
+          if (bin == null)
+          {
+            bin = new FetchBin(binName);
+            fetchBins.put(binName, bin);
+          }
+        }
+        // Reserve a slot
+        if (!bin.reserveFetchRequest())
+        {
+          // Release previous reservations, and return null
+          while (i > 0)
+          {
+            i--;
+            binName = binNames[i];
+            synchronized (fetchBins)
+            {
+              bin = fetchBins.get(binName);
+            }
+            if (bin != null)
+              bin.clearReservation();
+          }
+          return null;
+        }
+        i++;
+      }
+      
+      // All reservations have been made!  Convert them.
+      // (These are guaranteed to succeed - but they may wait)
+      i = 0;
+      while (i < binNames.length)
+      {
+        String binName = binNames[i];
+        FetchBin bin;
+        synchronized (fetchBins)
+        {
+          bin = fetchBins.get(binName);
+        }
+        if (bin != null)
+        {
+          if (!bin.waitNextFetch())
+          {
+            // Undo the reservations we haven't processed yet
+            while (i < binNames.length)
+            {
+              binName = binNames[i];
+              synchronized (fetchBins)
+              {
+                bin = fetchBins.get(binName);
+              }
+              if (bin != null)
+                bin.clearReservation();
+              i++;
+            }
+            return null;
+          }
+        }
+        i++;
+      }
+      
+      // Do a "begin fetch" for all throttle bins
+      synchronized (throttleBins)
+      {
+        for (String binName : binNames)
+        {
+          ThrottleBin bin = throttleBins.get(binName);
+          if (bin == null)
+          {
+            bin = new ThrottleBin(binName);
+            throttleBins.put(binName,bin);
+          }
+          bin.beginFetch();
+        }
+      }
+      
       return new StreamThrottler(this, binNames);
     }
     
-    /** Release permission to fetch a document.  Call this only when you
-    * called obtainFetchDocumentPermission() successfully earlier.
-    *@param currentTime is the current time, in ms. since epoch.
-    */
-    public void releaseFetchDocumentPermission(String[] binNames, long currentTime)
-    {
-      // MHL
-    }
-
     // IStreamThrottler support methods
     
     /** Obtain permission to read a block of bytes.  This method may wait until it is OK to proceed.
     * The throttle group, bin names, etc are already known
     * to this specific interface object, so it is unnecessary to include them here.
-    *@param currentTime is the current time, in ms. since epoch.
     *@param byteCount is the number of bytes to get permissions to read.
     *@return true if the wait took place as planned, or false if the system is being shut down.
     */
-    public boolean obtainReadPermission(String[] binNames, long currentTime, int byteCount)
+    public boolean obtainReadPermission(String[] binNames, int byteCount)
       throws InterruptedException
     {
       // MHL
@@ -487,15 +558,29 @@ public class Throttler
       
     /** Note the completion of the read of a block of bytes.  Call this after
     * obtainReadPermission() was successfully called, and bytes were successfully read.
-    *@param currentTime is the current time, in ms. since epoch.
     *@param origByteCount is the originally requested number of bytes to get permissions to read.
     *@param actualByteCount is the number of bytes actually read.
     */
-    public void releaseReadPermission(String[] binNames, long currentTime, int origByteCount, int actualByteCount)
+    public void releaseReadPermission(String[] binNames, int origByteCount, int actualByteCount)
     {
       // MHL
     }
 
+    /** Note the stream being closed.
+    */
+    public void closeStream(String[] binNames)
+    {
+      synchronized (throttleBins)
+      {
+        for (String binName : binNames)
+        {
+          ThrottleBin bin = throttleBins.get(binName);
+          if (bin != null)
+            bin.endFetch();
+        }
+      }
+    }
+
     // Bookkeeping methods
     
     /** Call this periodically.
@@ -657,26 +742,15 @@ public class Throttler
     * granted permission that created this object.  When done (or aborting), call
     * releaseFetchDocumentPermission() to note the completion of the document
     * fetch activity.
-    *@param currentTime is the current time, in ms. since epoch.
     *@return the stream throttler to use to throttle the actual data access, or null if the system is being shut down.
     */
     @Override
-    public IStreamThrottler obtainFetchDocumentPermission(long currentTime)
+    public IStreamThrottler obtainFetchDocumentPermission()
       throws InterruptedException
     {
-      return parent.obtainFetchDocumentPermission(binNames, currentTime);
+      return parent.obtainFetchDocumentPermission(binNames);
     }
     
-    /** Release permission to fetch a document.  Call this only when you
-    * called obtainFetchDocumentPermission() successfully earlier.
-    *@param currentTime is the current time, in ms. since epoch.
-    */
-    @Override
-    public void releaseFetchDocumentPermission(long currentTime)
-    {
-      parent.releaseFetchDocumentPermission(binNames, currentTime);
-    }
-
   }
   
   /** Stream throttler implementation class.
@@ -696,27 +770,33 @@ public class Throttler
     /** Obtain permission to read a block of bytes.  This method may wait until it is OK to proceed.
     * The throttle group, bin names, etc are already known
     * to this specific interface object, so it is unnecessary to include them here.
-    *@param currentTime is the current time, in ms. since epoch.
     *@param byteCount is the number of bytes to get permissions to read.
     *@return true if the wait took place as planned, or false if the system is being shut down.
     */
     @Override
-    public boolean obtainReadPermission(long currentTime, int byteCount)
+    public boolean obtainReadPermission(int byteCount)
       throws InterruptedException
     {
-      return parent.obtainReadPermission(binNames, currentTime, byteCount);
+      return parent.obtainReadPermission(binNames, byteCount);
     }
       
     /** Note the completion of the read of a block of bytes.  Call this after
     * obtainReadPermission() was successfully called, and bytes were successfully read.
-    *@param currentTime is the current time, in ms. since epoch.
     *@param origByteCount is the originally requested number of bytes to get permissions to read.
     *@param actualByteCount is the number of bytes actually read.
     */
     @Override
-    public void releaseReadPermission(long currentTime, int origByteCount, int actualByteCount)
+    public void releaseReadPermission(int origByteCount, int actualByteCount)
+    {
+      parent.releaseReadPermission(binNames, origByteCount, actualByteCount);
+    }
+
+    /** Note the stream being closed.
+    */
+    @Override
+    public void closeStream()
     {
-      parent.releaseReadPermission(binNames, currentTime, origByteCount, actualByteCount);
+      parent.closeStream(binNames);
     }
 
   }