You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/14 10:12:52 UTC

svn commit: r1550906 - in /manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core: interfaces/ throttler/

Author: kwright
Date: Sat Dec 14 09:12:51 2013
New Revision: 1550906

URL: http://svn.apache.org/r1550906
Log:
More development

Modified:
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IConnectionThrottler.java Sat Dec 14 09:12:51 2013
@@ -60,7 +60,7 @@ public interface IConnectionThrottler
   *@param throttleGroupType is the throttle group type.
   *@param throttleGroup is the throttle group.
   *@param binNames is the set of bin names to throttle for, within the throttle group.
-  *@return the fetch throttler to use when performing fetches from the corresponding connection.
+  *@return the fetch throttler to use when performing fetches from the corresponding connection, or null if the system is being shut down.
   */
   public IFetchThrottler obtainConnectionPermission(String throttleGroupType , String throttleGroup,
     String[] binNames)

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java Sat Dec 14 09:12:51 2013
@@ -34,7 +34,7 @@ public interface IFetchThrottler
   * releaseFetchDocumentPermission() to note the completion of the document
   * fetch activity.
   *@param currentTime is the current time, in ms. since epoch.
-  *@return the stream throttler to use to throttle the actual data access.
+  *@return the stream throttler to use to throttle the actual data access, or null if the system is being shut down.
   */
   public IStreamThrottler obtainFetchDocumentPermission(long currentTime)
     throws InterruptedException;

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java Sat Dec 14 09:12:51 2013
@@ -33,8 +33,9 @@ public interface IStreamThrottler
   * to this specific interface object, so it is unnecessary to include them here.
   *@param currentTime is the current time, in ms. since epoch.
   *@param byteCount is the number of bytes to get permissions to read.
+  *@return true if the wait took place as planned, or false if the system is being shut down.
   */
-  public void obtainReadPermission(long currentTime, int byteCount)
+  public boolean obtainReadPermission(long currentTime, int byteCount)
     throws InterruptedException;
     
   /** Note the completion of the read of a block of bytes.  Call this after

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionThrottler.java Sat Dec 14 09:12:51 2013
@@ -86,14 +86,21 @@ public class ConnectionThrottler impleme
   *@param throttleGroupType is the throttle group type.
   *@param throttleGroup is the throttle group.
   *@param binNames is the set of bin names to throttle for, within the throttle group.
-  *@return the fetch throttler to use when performing fetches from the corresponding connection.
+  *@return the fetch throttler to use when performing fetches from the corresponding connection, or null if the system is being shut down.
   */
   @Override
   public IFetchThrottler obtainConnectionPermission(String throttleGroupType , String throttleGroup,
     String[] binNames)
     throws ManifoldCFException
   {
-    return throttler.obtainConnectionPermission(threadContext, throttleGroupType, throttleGroup, binNames);
+    try
+    {
+      return throttler.obtainConnectionPermission(throttleGroupType, throttleGroup, binNames);
+    }
+    catch (InterruptedException e)
+    {
+      throw new ManifoldCFException(e.getMessage(),ManifoldCFException.INTERRUPTED);
+    }
   }
   
   /** Determine whether to release a pooled connection.  This method returns the number of bins
@@ -110,7 +117,7 @@ public class ConnectionThrottler impleme
   public int overConnectionQuotaCount(String throttleGroupType, String throttleGroup, String[] binNames)
     throws ManifoldCFException
   {
-    return throttler.overConnectionQuotaCount(threadContext, throttleGroupType, throttleGroup, binNames);
+    return throttler.overConnectionQuotaCount(throttleGroupType, throttleGroup, binNames);
   }
   
   /** Release permission to use one connection. This presumes that obtainConnectionPermission()
@@ -123,7 +130,7 @@ public class ConnectionThrottler impleme
   public void releaseConnectionPermission(String throttleGroupType, String throttleGroup, String[] binNames)
     throws ManifoldCFException
   {
-    throttler.releaseConnectionPermission(threadContext, throttleGroupType, throttleGroup, binNames);
+    throttler.releaseConnectionPermission(throttleGroupType, throttleGroup, binNames);
   }
   
   /** Poll periodically, to update cluster-wide statistics and allocation.

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Sat Dec 14 09:12:51 2013
@@ -81,8 +81,8 @@ public class ThrottleBin
   protected long seriesStartTime = -1L;
   /** Total actual bytes read in this series; this includes fetches in progress */
   protected long totalBytesRead = -1L;
-  /** The minimum milliseconds per byte per server */
-  protected double minimumMillisecondsPerBytePerServer = Double.MAX_VALUE;
+  /** The minimum milliseconds per byte */
+  protected double minimumMillisecondsPerByte = Double.MAX_VALUE;
   
   /** Constructor. */
   public ThrottleBin(String binName)
@@ -97,9 +97,9 @@ public class ThrottleBin
   }
 
   /** Update minimumMillisecondsPerBytePerServer */
-  public void updateMinimumMillisecondsPerBytePerServer(double min)
+  public void updateMinimumMillisecondsPerByte(double min)
   {
-    this.minimumMillisecondsPerBytePerServer = min;
+    this.minimumMillisecondsPerByte = min;
   }
   
   /** Note the start of a fetch operation for a bin.  Call this method just before the actual stream access begins.
@@ -172,7 +172,7 @@ public class ThrottleBin
         long estimatedTime = (long)(rateEstimate * (double)byteCount);
 
         // Figure out how long the total byte count should take, to meet the constraint
-        long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
+        long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerByte);
 
         // The wait time is the different between our desired end time, minus the estimated time to read the data, and the
         // current time.  But it can't be negative.

Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java?rev=1550906&r1=1550905&r2=1550906&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java Sat Dec 14 09:12:51 2013
@@ -52,6 +52,17 @@ public class Throttler
   {
   }
   
+  // There are a lot of synchronizers to coordinate here.  They are indeed hierarchical.  It is not possible to simply
+  // throw a synchronizer at every level, and require that we hold all of them, because when we wait somewhere in the
+  // inner level, we will continue to hold locks and block access to all the outer levels.
+  //
+  // Instead, I've opted for a model whereby individual resources are protected.  This is tricky to coordinate, though,
+  // because (for instance) after a resource has been removed from the hash table, it had better be cleaned up
+  // thoroughly before the outer lock is removed, or two versions of the resource might wind up coming into existence.
+  // The general rule is therefore:
+  // (1) Creation or deletion of resources involves locking the parent where the resource is being added or removed
+  // (2) Anything that waits CANNOT also add or remove.
+  
   /** Get all existing throttle groups for a throttle group type.
   * The throttle group type typically describes a connector class, while the throttle group represents
   * a namespace of bin names specific to that connector class.
@@ -74,15 +85,14 @@ public class Throttler
   public void removeThrottleGroup(IThreadContext threadContext, String throttleGroupType, String throttleGroup)
     throws ManifoldCFException
   {
-    ThrottlingGroups tg;
+    // Removal.  Lock the whole hierarchy.
     synchronized (throttleGroupsHash)
     {
-      tg = throttleGroupsHash.get(throttleGroupType);
-    }
-
-    if (tg != null)
-    {
-      tg.removeThrottleGroup(threadContext, throttleGroup);
+      ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+      if (tg != null)
+      {
+        tg.removeThrottleGroup(threadContext, throttleGroup);
+      }
     }
   }
   
@@ -95,18 +105,17 @@ public class Throttler
   public void updateThrottleSpecification(IThreadContext threadContext, String throttleGroupType, String throttleGroup, IThrottleSpec throttleSpec)
     throws ManifoldCFException
   {
-    ThrottlingGroups tg;
+    // Potential addition.  Lock the whole hierarchy.
     synchronized (throttleGroupsHash)
     {
-      tg = throttleGroupsHash.get(throttleGroupType);
+      ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
       if (tg == null)
       {
         tg = new ThrottlingGroups(throttleGroupType);
         throttleGroupsHash.put(throttleGroupType, tg);
       }
+      tg.updateThrottleSpecification(threadContext, throttleGroup, throttleSpec);
     }
-    
-    tg.updateThrottleSpecification(threadContext, throttleGroup, throttleSpec);
   }
 
   /** Get permission to use a connection, which is described by the passed array of bin names.
@@ -116,14 +125,26 @@ public class Throttler
   *@param throttleGroupType is the throttle group type.
   *@param throttleGroup is the throttle group.
   *@param binNames is the set of bin names to throttle for, within the throttle group.
-  *@return the fetch throttler to use when performing fetches from the corresponding connection.
+  *@return the fetch throttler to use when performing fetches from the corresponding connection, or null if the system is being shut down.
   */
-  public IFetchThrottler obtainConnectionPermission(IThreadContext threadContext, String throttleGroupType , String throttleGroup,
+  public IFetchThrottler obtainConnectionPermission(String throttleGroupType , String throttleGroup,
     String[] binNames)
-    throws ManifoldCFException
+    throws InterruptedException
   {
-    // MHL
-    return null;
+    // This method may wait at the innermost level.  We therefore do not lock the hierarchy, BUT
+    // we must code for the possibility that the hierarchy is fluid, and may dissolve.  Under such
+    // conditions we could simply retry.  But in this case, the hierarchy has state: there is an
+    // explicit creation step (updateThrottleSpecification) that must be called to establish the
+    // innermost object and set it up correctly.  It is therefore meaningful if we do not find
+    // the expected structure: we return null in that case.
+    ThrottlingGroups tg;
+    synchronized (throttleGroupsHash)
+    {
+      tg = throttleGroupsHash.get(throttleGroupType);
+    }
+    if (tg == null)
+      return null;
+    return tg.obtainConnectionPermission(throttleGroup, binNames);
   }
   
   /** Determine whether to release a pooled connection.  This method returns the number of bins
@@ -136,11 +157,14 @@ public class Throttler
   *@param binNames is the set of bin names to throttle for, within the throttle group.
   *@return the number of bins that are over quota, or zero if none of them are.
   */
-  public int overConnectionQuotaCount(IThreadContext threadContext, String throttleGroupType, String throttleGroup, String[] binNames)
-    throws ManifoldCFException
+  public int overConnectionQuotaCount(String throttleGroupType, String throttleGroup, String[] binNames)
   {
-    // MHL
-    return 0;
+    // No waiting, so it is OK to lock everything.
+    synchronized (throttleGroupsHash)
+    {
+      // MHL
+      return 0;
+    }
   }
   
   /** Release permission to use one connection. This presumes that obtainConnectionPermission()
@@ -149,10 +173,13 @@ public class Throttler
   *@param throttleGroup is the throttle group.
   *@param binNames is the set of bin names to throttle for, within the throttle group.
   */
-  public void releaseConnectionPermission(IThreadContext threadContext, String throttleGroupType, String throttleGroup, String[] binNames)
-    throws ManifoldCFException
+  public void releaseConnectionPermission(String throttleGroupType, String throttleGroup, String[] binNames)
   {
-    // MHL
+    // No waiting, so it is ok to lock the entire tree.
+    synchronized (throttleGroupsHash)
+    {
+      // MHL
+    }
   }
   
   /** Poll periodically.
@@ -160,15 +187,13 @@ public class Throttler
   public void poll(IThreadContext threadContext, String throttleGroupType)
     throws ManifoldCFException
   {
-    // Find the right pool, and poll that
-    ThrottlingGroups tg;
+    // No waiting, so lock the entire tree.
     synchronized (throttleGroupsHash)
     {
-      tg = throttleGroupsHash.get(throttleGroupType);
+      ThrottlingGroups tg = throttleGroupsHash.get(throttleGroupType);
+      if (tg != null)
+        tg.poll(threadContext);
     }
-    
-    if (tg != null)
-      tg.poll(threadContext);
       
   }
   
@@ -177,6 +202,7 @@ public class Throttler
   public void freeUnusedResources(IThreadContext threadContext)
     throws ManifoldCFException
   {
+    // This potentially affects the entire hierarchy.
     // Go through the whole pool and clean it out
     synchronized (throttleGroupsHash)
     {
@@ -194,6 +220,7 @@ public class Throttler
   public void destroy(IThreadContext threadContext)
     throws ManifoldCFException
   {
+    // This affects the entire hierarchy, so lock the whole thing.
     // Go through the whole pool and clean it out
     synchronized (throttleGroupsHash)
     {
@@ -249,8 +276,25 @@ public class Throttler
       }
     }
     
+    /** Obtain connection permission.
+    *@return null if the hierarchy has changed!
+    */
+    public IFetchThrottler obtainConnectionPermission(String throttleGroup, String[] binNames)
+      throws InterruptedException
+    {
+      // Can't lock the hierarchy here.
+      ThrottlingGroup g;
+      synchronized (groups)
+      {
+        g = groups.get(throttleGroup);
+      }
+      if (g == null)
+        return null;
+      return g.obtainConnectionPermission(binNames);
+    }
+
     /** Remove specified throttle group */
-    public synchronized void removeThrottleGroup(IThreadContext threadContext, String throttleGroup)
+    public void removeThrottleGroup(IThreadContext threadContext, String throttleGroup)
       throws ManifoldCFException
     {
       // Must synch the whole thing, because otherwise there would be a risk of someone recreating the
@@ -265,7 +309,8 @@ public class Throttler
       }
     }
     
-    /** Poll this set of throttle groups */
+    /** Poll this set of throttle groups.
+    */
     public void poll(IThreadContext threadContext)
       throws ManifoldCFException
     {
@@ -360,7 +405,20 @@ public class Throttler
       this.throttleSpec = throttleSpec;
     }
     
-    // MHL
+    /** Obtain connection permission.
+    *@return null if we are marked as 'not alive'.
+    */
+    public IFetchThrottler obtainConnectionPermission(String[] binNames)
+      throws InterruptedException
+    {
+      // First, make sure all the bins exist
+      // MHL
+      // Reserve a slot in all bins
+      // MHL
+      // Wait on each reserved bin in turn
+      // MHL
+      return null;
+    }
     
     /** Call this periodically.
     */
@@ -391,7 +449,7 @@ public class Throttler
       {
         for (ThrottleBin bin : throttleBins.values())
         {
-          bin.updateMinimumMillisecondsPerBytePerServer(throttleSpec.getMinimumMillisecondsPerByte(bin.getBinName()));
+          bin.updateMinimumMillisecondsPerByte(throttleSpec.getMinimumMillisecondsPerByte(bin.getBinName()));
         }
       }