You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/12 16:05:00 UTC
svn commit: r1550432 - in
/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler:
ConnectionBin.java ThrottleBin.java
Author: kwright
Date: Thu Dec 12 15:04:59 2013
New Revision: 1550432
URL: http://svn.apache.org/r1550432
Log:
Add bin classes, mostly taken from webcrawler connector
Added:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (with props)
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (with props)
Added: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java?rev=1550432&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java (added)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java Thu Dec 12 15:04:59 2013
@@ -0,0 +1,98 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+
+/** Connection tracking for a bin.
+*
+* This class keeps track of information needed to figure out throttling for connections,
+* on a bin-by-bin basis. It is *not*, however, a connection pool. Actually establishing
+* connections, and pooling established connections, is functionality that must reside in the
+* caller.
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class ConnectionBin
+{
+ /** This is the bin name which this connection pool belongs to */
+ protected final String binName;
+ /** This is the number of connections in this bin that are signed out and presumably in use */
+ protected int inUseConnections = 0;
+ /** This is the last time a fetch was done on this bin */
+ protected long lastFetchTime = 0L;
+ /** This object is what we synchronize on when we are waiting on a connection to free up for this
+ * bin. This is a separate object, because we also want to protect the integrity of the
+ * ConnectionBin object itself, for which we'll use the ConnectionBin's synchronizer. */
+ protected final Integer connectionWait = new Integer(0);
+
+ /** Constructor. */
+ public ConnectionBin(String binName)
+ {
+ this.binName = binName;
+ }
+
+ /** Get the bin name. */
+ public String getBinName()
+ {
+ return binName;
+ }
+
+ /** Note the creation of an active connection that belongs to this bin. The slots all must
+ * have been reserved prior to the connection being created.
+ */
+ public synchronized void noteConnectionCreation()
+ {
+ inUseConnections++;
+ }
+
+ /** Note the destruction of an active connection that belongs to this bin.
+ */
+ public synchronized void noteConnectionDestruction()
+ {
+ inUseConnections--;
+ }
+
+ /** Note a new time for connection fetch for this pool.
+ *@param currentTime is the time the fetch was started.
+ */
+ public synchronized void setLastFetchTime(long currentTime)
+ {
+ if (currentTime > lastFetchTime)
+ lastFetchTime = currentTime;
+ }
+
+ /** Get the last fetch time.
+ *@return the time.
+ */
+ public synchronized long getLastFetchTime()
+ {
+ return lastFetchTime;
+ }
+
+ /** Count connections that are in use.
+ *@return connections that are in use.
+ */
+ public synchronized int countConnections()
+ {
+ return inUseConnections;
+ }
+}
+
Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ConnectionBin.java
------------------------------------------------------------------------------
svn:keywords = Id
Added: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1550432&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (added)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Thu Dec 12 15:04:59 2013
@@ -0,0 +1,240 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.core.throttler;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.core.system.ManifoldCF;
+
+/** Throttles for a bin.
+* An instance of this class keeps track of the information needed to bandwidth throttle access
+* to a url belonging to a specific bin.
+*
+* In order to calculate
+* the effective "burst" fetches per second and bytes per second, we need to have some idea what the window is.
+* For example, a long hiatus from fetching could cause overuse of the server when fetching resumes, if the
+* window length is too long.
+*
+* One solution to this problem would be to keep a list of the individual fetches as records. Then, we could
+* "expire" a fetch by discarding the old record. However, this is quite memory consumptive for all but the
+* smallest intervals.
+*
+* Another, better, solution is to hook into the start and end of individual fetches. These will, presumably, occur
+* at the fastest possible rate without long pauses spent doing something else. The only complication is that
+* fetches may well overlap, so we need to "reference count" the fetches to know when to reset the counters.
+* For "fetches per second", we can simply make sure we "schedule" the next fetch at an appropriate time, rather
+* than keep records around. The overall rate may therefore be somewhat less than the specified rate, but that's perfectly
+* acceptable.
+*
+* Some notes on the algorithms used to limit server bandwidth impact
+* ==================================================================
+*
+* In a single connection case, the algorithm we'd want to use works like this. On the first chunk of a series,
+* the total length of time and the number of bytes are recorded. Then, prior to each subsequent chunk, a calculation
+* is done which attempts to hit the bandwidth target by the end of the chunk read, using the rate of the first chunk
+* access as a way of estimating how long it will take to fetch those next n bytes.
+*
+* For a multi-connection case, which this is, it's harder to either come up with a good maximum bandwidth estimate,
+* and harder still to "hit the target", because simultaneous fetches will intrude. The strategy is therefore:
+*
+* 1) The first chunk of any series should proceed without interference from other connections to the same server.
+* The goal here is to get a decent quality estimate without any possibility of overwhelming the server.
+*
+* 2) The bandwidth of the first chunk is treated as the "maximum bandwidth per connection". That is, if other
+* connections are going on, we can presume that each connection will use at most the bandwidth that the first fetch
+* took. Thus, by generating end-time estimates based on this number, we are actually being conservative and
+* using less server bandwidth.
+*
+* 3) For chunks that have started but not finished, we keep track of their size and estimated elapsed time in order to schedule when
+* new chunks from other connections can start.
+*
+* NOTE WELL: This is entirely local in operation
+*/
+public class ThrottleBin
+{
+ /** This is the bin name which this throttle belongs to. */
+ protected final String binName;
+ /** This is the reference count for this bin (which records active references) */
+ protected volatile int refCount = 0;
+ /** The inverse rate estimate of the first fetch, in ms/byte */
+ protected double rateEstimate = 0.0;
+ /** Flag indicating whether a rate estimate is needed */
+ protected volatile boolean estimateValid = false;
+ /** Flag indicating whether rate estimation is in progress yet */
+ protected volatile boolean estimateInProgress = false;
+ /** The start time of this series */
+ protected long seriesStartTime = -1L;
+ /** Total actual bytes read in this series; this includes fetches in progress */
+ protected long totalBytesRead = -1L;
+
+ /** Constructor. */
+ public ThrottleBin(String binName)
+ {
+ this.binName = binName;
+ }
+
+ /** Get the bin name. */
+ public String getBinName()
+ {
+ return binName;
+ }
+
+ /** Note the start of a fetch operation for a bin. Call this method just before the actual stream access begins.
+ * May wait until schedule allows.
+ */
+ public void beginFetch()
+ throws InterruptedException
+ {
+ synchronized (this)
+ {
+ if (refCount == 0)
+ {
+ // Now, reset bandwidth throttling counters
+ estimateValid = false;
+ rateEstimate = 0.0;
+ totalBytesRead = 0L;
+ estimateInProgress = false;
+ seriesStartTime = -1L;
+ }
+ refCount++;
+ }
+
+ }
+
+ /** Abort the fetch.
+ */
+ public void abortFetch()
+ {
+ synchronized (this)
+ {
+ refCount--;
+ }
+ }
+
+ /** Note the start of an individual byte read of a specified size. Call this method just before the
+ * read request takes place. Performs the necessary delay prior to reading specified number of bytes from the server.
+ */
+ public void beginRead(int byteCount, double minimumMillisecondsPerBytePerServer)
+ throws InterruptedException
+ {
+ long currentTime = System.currentTimeMillis();
+
+ synchronized (this)
+ {
+ while (estimateInProgress)
+ wait();
+ if (estimateValid == false)
+ {
+ seriesStartTime = currentTime;
+ estimateInProgress = true;
+ // Add these bytes to the estimated total
+ totalBytesRead += (long)byteCount;
+ // Exit early; this thread isn't going to do any waiting
+ return;
+ }
+ }
+
+ // It is possible for the following code to get interrupted. If that happens,
+ // we have to unstick the threads that are waiting on the estimate!
+ boolean finished = false;
+ try
+ {
+ long waitTime = 0L;
+ synchronized (this)
+ {
+ // Add these bytes to the estimated total
+ totalBytesRead += (long)byteCount;
+
+ // Estimate the time this read will take, and wait accordingly
+ long estimatedTime = (long)(rateEstimate * (double)byteCount);
+
+ // Figure out how long the total byte count should take, to meet the constraint
+ long desiredEndTime = seriesStartTime + (long)(((double)totalBytesRead) * minimumMillisecondsPerBytePerServer);
+
+ // The wait time is the different between our desired end time, minus the estimated time to read the data, and the
+ // current time. But it can't be negative.
+ waitTime = (desiredEndTime - estimatedTime) - currentTime;
+ }
+
+ if (waitTime > 0L)
+ {
+ ManifoldCF.sleep(waitTime);
+ }
+ finished = true;
+ }
+ finally
+ {
+ if (!finished)
+ {
+ abortRead();
+ }
+ }
+ }
+
+ /** Abort a read in progress.
+ */
+ public void abortRead()
+ {
+ synchronized (this)
+ {
+ if (estimateInProgress)
+ {
+ estimateInProgress = false;
+ notifyAll();
+ }
+ }
+ }
+
+ /** Note the end of an individual read from the server. Call this just after an individual read completes.
+ * Pass the actual number of bytes read to the method.
+ */
+ public void endRead(int originalCount, int actualCount)
+ {
+ long currentTime = System.currentTimeMillis();
+
+ synchronized (this)
+ {
+ totalBytesRead = totalBytesRead + (long)actualCount - (long)originalCount;
+ if (estimateInProgress)
+ {
+ if (actualCount == 0)
+ // Didn't actually get any bytes, so use 0.0
+ rateEstimate = 0.0;
+ else
+ rateEstimate = ((double)(currentTime - seriesStartTime))/(double)actualCount;
+ estimateValid = true;
+ estimateInProgress = false;
+ notifyAll();
+ }
+ }
+ }
+
+ /** Note the end of a fetch operation. Call this method just after the fetch completes.
+ */
+ public boolean endFetch()
+ {
+ synchronized (this)
+ {
+ refCount--;
+ return (refCount == 0);
+ }
+
+ }
+
+}
+
Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
------------------------------------------------------------------------------
svn:keywords = Id