You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/12 16:05:00 UTC

svn commit: r1550432 - in /manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler: ConnectionBin.java ThrottleBin.java

Author: kwright
Date: Thu Dec 12 15:04:59 2013
New Revision: 1550432

URL: http://svn.apache.org/r1550432
Log:
Add bin classes, mostly taken from webcrawler connector

Added:
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java   (with props)
    manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java   (with props)

Added: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java?rev=1550432&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (added)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java Thu Dec 12 15:04:59 2013
@@ -0,0 +1,98 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+
+/** Connection tracking for a bin.
+*
+* This class keeps track of information needed to figure out throttling for connections,
+* on a bin-by-bin basis.  It is *not*, however, a connection pool.  Actually establishing
+* connections, and pooling established connections, is functionality that must reside in the
+* caller.
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class ConnectionBin
+{
+  /** This is the bin name which this connection pool belongs to */
+  protected final String binName;
+  /** This is the number of connections in this bin that are signed out and presumably in use */
+  protected int inUseConnections = 0;
+  /** This is the last time a fetch was done on this bin */
+  protected long lastFetchTime = 0L;
+  /** This object is what we synchronize on when we are waiting on a connection to free up for this
+  * bin.  This is a separate object, because we also want to protect the integrity of the
+  * ConnectionBin object itself, for which we'll use the ConnectionBin's synchronizer. */
+  protected final Integer connectionWait = new Integer(0);
+
+  /** Constructor. */
+  public ConnectionBin(String binName)
+  {
+    this.binName = binName;
+  }
+
+  /** Get the bin name. */
+  public String getBinName()
+  {
+    return binName;
+  }
+
+  /** Note the creation of an active connection that belongs to this bin.  The slots all must
+  * have been reserved prior to the connection being created.
+  */
+  public synchronized void noteConnectionCreation()
+  {
+    inUseConnections++;
+  }
+
+  /** Note the destruction of an active connection that belongs to this bin.
+  */
+  public synchronized void noteConnectionDestruction()
+  {
+    inUseConnections--;
+  }
+
+  /** Note a new time for connection fetch for this pool.
+  *@param currentTime is the time the fetch was started.
+  */
+  public synchronized void setLastFetchTime(long currentTime)
+  {
+    if (currentTime > lastFetchTime)
+      lastFetchTime = currentTime;
+  }
+
+  /** Get the last fetch time.
+  *@return the time.
+  */
+  public synchronized long getLastFetchTime()
+  {
+    return lastFetchTime;
+  }
+
+  /** Count connections that are in use.
+  *@return connections that are in use.
+  */
+  public synchronized int countConnections()
+  {
+    return inUseConnections;
+  }
+}
+

Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1550432&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (added)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Thu Dec 12 15:04:59 2013
@@ -0,0 +1,240 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+
+/** Throttles for a bin.
+* An instance of this class keeps track of the information needed to bandwidth throttle access
+* to a url belonging to a specific bin.
+*
+* In order to calculate
+* the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
+* For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
+* window length is too long.
+*
+* One solution to this problem would be to keep a list of the individual fetches as records.  Then, we could
+* "expire" a fetch by discarding the old record.  However, this is quite memory consumptive for all but the
+* smallest intervals.
+*
+* Another, better, solution is to hook into the start and end of individual fetches.  These will, presumably, occur
+* at the fastest possible rate without long pauses spent doing something else.  The only complication is that
+* fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
+* For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
+* than keep records around.  The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
+* acceptable.
+*
+* Some notes on the algorithms used to limit server bandwidth impact
+* ==================================================================
+*
+* In a single connection case, the algorithm we'd want to use works like this.  On the first chunk of a series,
+* the total length of time and the number of bytes are recorded.  Then, prior to each subsequent chunk, a calculation
+* is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
+* access as a way of estimating how long it will take to fetch those next n bytes.
+*
+* For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
+* and harder still to "hit the target", because simultaneous fetches will intrude.  The strategy is therefore:
+*
+* 1) The first chunk of any series should proceed without interference from other connections to the same server.
+*    The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
+*
+* 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection".  That is, if other
+*    connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
+*    took.  Thus, by generating end-time estimates based on this number, we are actually being conservative and
+*    using less server bandwidth.
+*
+* 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
+*    new chunks from other connections can start.
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class ThrottleBin
+{
+  /** This is the bin name which this throttle belongs to. */
+  protected final String binName;
+  /** This is the reference count for this bin (which records active references) */
+  protected volatile int refCount = 0;
+  /** The inverse rate estimate of the first fetch, in ms/byte */
+  protected double rateEstimate = 0.0;
+  /** Flag indicating whether a rate estimate is needed */
+  protected volatile boolean estimateValid = false;
+  /** Flag indicating whether rate estimation is in progress yet */
+  protected volatile boolean estimateInProgress = false;
+  /** The start time of this series */
+  protected long seriesStartTime = -1L;
+  /** Total actual bytes read in this series; this includes fetches in progress */
+  protected long totalBytesRead = -1L;
+
+  /** Constructor. */
+  public ThrottleBin(String binName)
+  {
+    this.binName = binName;
+  }
+
+  /** Get the bin name. */
+  public String getBinName()
+  {
+    return binName;
+  }
+
+  /** Note the start of a fetch operation for a bin.  Call this method just before the actual stream access begins.
+  * May wait until schedule allows.
+  */
+  public void beginFetch()
+    throws InterruptedException
+  {
+    synchronized (this)
+    {
+      if (refCount == 0)
+      {
+        // Now, reset bandwidth throttling counters
+        estimateValid = false;
+        rateEstimate = 0.0;
+        totalBytesRead = 0L;
+        estimateInProgress = false;
+        seriesStartTime = -1L;
+      }
+      refCount++;
+    }
+
+  }
+
+  /** Abort the fetch.
+  */
+  public void abortFetch()
+  {
+    synchronized (this)
+    {
+      refCount--;
+    }
+  }
+    
+  /** Note the start of an individual byte read of a specified size.  Call this method just before the
+  * read request takes place.  Performs the necessary delay prior to reading specified number of bytes from the server.
+  */
+  public void beginRead(int byteCount, double minimumMillisecondsPerBytePerServer)
+    throws InterruptedException
+  {
+    long currentTime = System.currentTimeMillis();
+
+    synchronized (this)
+    {
+      while (estimateInProgress)
+        wait();
+      if (estimateValid == false)
+      {
+        seriesStartTime = currentTime;
+        estimateInProgress = true;
+        // Add these bytes to the estimated total
+        totalBytesRead += (long)byteCount;
+        // Exit early; this thread isn't going to do any waiting
+        return;
+      }
+    }
+
+    // It is possible for the following code to get interrupted.  If that happens,
+    // we have to unstick the threads that are waiting on the estimate!
+    boolean finished = false;
+    try
+    {
+      long waitTime = 0L;
+      synchronized (this)
+      {
+        // Add these bytes to the estimated total
+        totalBytesRead += (long)byteCount;
+
+        // Estimate the time this read will take, and wait accordingly
+        long estimatedTime = (long)(rateEstimate * (double)byteCount);
+
+        // Figure out how long the total byte count should take, to meet the constraint
+        long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
+
+        // The wait time is the different between our desired end time, minus the estimated time to read the data, and the
+        // current time.  But it can't be negative.
+        waitTime = (desiredEndTime - estimatedTime) - currentTime;
+      }
+
+      if (waitTime > 0L)
+      {
+        ManifoldCF.sleep(waitTime);
+      }
+      finished = true;
+    }
+    finally
+    {
+      if (!finished)
+      {
+        abortRead();
+      }
+    }
+  }
+
+  /** Abort a read in progress.
+  */
+  public void abortRead()
+  {
+    synchronized (this)
+    {
+      if (estimateInProgress)
+      {
+        estimateInProgress = false;
+        notifyAll();
+      }
+    }
+  }
+    
+  /** Note the end of an individual read from the server.  Call this just after an individual read completes.
+  * Pass the actual number of bytes read to the method.
+  */
+  public void endRead(int originalCount, int actualCount)
+  {
+    long currentTime = System.currentTimeMillis();
+
+    synchronized (this)
+    {
+      totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
+      if (estimateInProgress)
+      {
+        if (actualCount == 0)
+          // Didn't actually get any bytes, so use 0.0
+          rateEstimate = 0.0;
+        else
+          rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
+        estimateValid = true;
+        estimateInProgress = false;
+        notifyAll();
+      }
+    }
+  }
+
+  /** Note the end of a fetch operation.  Call this method just after the fetch completes.
+  */
+  public boolean endFetch()
+  {
+    synchronized (this)
+    {
+      refCount--;
+      return (refCount == 0);
+    }
+
+  }
+
+}
+

Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
------------------------------------------------------------------------------
    svn:keywords = Id