You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/15 07:04:55 UTC
svn commit: r1551001 - in
/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core:
interfaces/ throttler/
Author: kwright
Date: Sun Dec 15 06:04:55 2013
New Revision: 1551001
URL: http://svn.apache.org/r1551001
Log:
Hook up fetch bins
Modified:
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java?rev=1551001&r1=1551000&r2=1551001&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IFetchThrottler.java Sun Dec 15 06:04:55 2013
@@ -31,18 +31,11 @@ public interface IFetchThrottler
/** Get permission to fetch a document. This grants permission to start
* fetching a single document, within the connection that has already been
* granted permission that created this object. When done (or aborting), call
- * releaseFetchDocumentPermission() to note the completion of the document
+ * IStreamThrottler.closeStream() to note the completion of the document
* fetch activity.
- *@param currentTime is the current time, in ms. since epoch.
*@return the stream throttler to use to throttle the actual data access, or null if the system is being shut down.
*/
- public IStreamThrottler obtainFetchDocumentPermission(long currentTime)
+ public IStreamThrottler obtainFetchDocumentPermission()
throws InterruptedException;
- /** Release permission to fetch a document. Call this only when you
- * called obtainFetchDocumentPermission() successfully earlier.
- *@param currentTime is the current time, in ms. since epoch.
- */
- public void releaseFetchDocumentPermission(long currentTime);
-
}
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java?rev=1551001&r1=1551000&r2=1551001&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/interfaces/IStreamThrottler.java Sun Dec 15 06:04:55 2013
@@ -31,19 +31,20 @@ public interface IStreamThrottler
/** Obtain permission to read a block of bytes. This method may wait until it is OK to proceed.
* The throttle group, bin names, etc are already known
* to this specific interface object, so it is unnecessary to include them here.
- *@param currentTime is the current time, in ms. since epoch.
*@param byteCount is the number of bytes to get permissions to read.
*@return true if the wait took place as planned, or false if the system is being shut down.
*/
- public boolean obtainReadPermission(long currentTime, int byteCount)
+ public boolean obtainReadPermission(int byteCount)
throws InterruptedException;
/** Note the completion of the read of a block of bytes. Call this after
* obtainReadPermission() was successfully called, and bytes were successfully read.
- *@param currentTime is the current time, in ms. since epoch.
*@param origByteCount is the originally requested number of bytes to get permissions to read.
*@param actualByteCount is the number of bytes actually read.
*/
- public void releaseReadPermission(long currentTime, int origByteCount, int actualByteCount);
+ public void releaseReadPermission(int origByteCount, int actualByteCount);
+ /** Note the stream being closed.
+ */
+ public void closeStream();
}
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java?rev=1551001&r1=1551000&r2=1551001&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/FetchBin.java Sun Dec 15 06:04:55 2013
@@ -66,18 +66,23 @@ public class FetchBin
/** Reserve a request to fetch a document from this bin. The actual fetch is not yet committed
* with this call, but if it succeeds for all bins associated with the document, then the caller
* has permission to do the fetch, and can update the last fetch time.
+ *@return false if the fetch bin is being shut down.
*/
- public synchronized void reserveFetchRequest()
+ public synchronized boolean reserveFetchRequest()
throws InterruptedException
{
// First wait for the ability to even get the next fetch from this bin
while (true)
{
+ if (!isAlive)
+ return false;
if (!reserveNextFetch)
- break;
+ {
+ reserveNextFetch = true;
+ return true;
+ }
wait();
}
- reserveNextFetch = true;
}
/** Clear reserved request.
@@ -102,6 +107,7 @@ public class FetchBin
while (true)
{
if (!isAlive)
+ // Leave it to the caller to undo reservations
return false;
if (minTimeBetweenFetches == Long.MAX_VALUE)
{
@@ -114,22 +120,18 @@ public class FetchBin
// Compute how long we have to wait, based on the current time and the time of the last fetch.
long waitAmt = lastFetchTime + minTimeBetweenFetches - currentTime;
if (waitAmt <= 0L)
+ {
+ // Note actual time we start the fetch.
+ if (currentTime > lastFetchTime)
+ lastFetchTime = currentTime;
+ reserveNextFetch = false;
return true;
+ }
wait(waitAmt);
}
}
}
- /** Note the beginning of fetch of a document from this bin.
- *@param currentTime is the actual time the fetch was started.
- */
- public synchronized void beginFetch(long currentTime)
- {
- if (currentTime > lastFetchTime)
- lastFetchTime = currentTime;
- reserveNextFetch = false;
- }
-
/** Shut the bin down, and wake up all threads waiting on it.
*/
public synchronized void shutDown()
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java?rev=1551001&r1=1551000&r2=1551001&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/ThrottleBin.java Sun Dec 15 06:04:55 2013
@@ -108,7 +108,6 @@ public class ThrottleBin
* May wait until schedule allows.
*/
public void beginFetch()
- throws InterruptedException
{
synchronized (this)
{
Modified: manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java?rev=1551001&r1=1551000&r2=1551001&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java (original)
+++ manifoldcf/branches/CONNECTORS-829/framework/core/src/main/java/org/apache/manifoldcf/core/throttler/Throttler.java Sun Dec 15 06:04:55 2013
@@ -431,15 +431,14 @@ public class Throttler
/** Release connection */
public void releaseConnectionPermission(String[] binNames)
{
- for (String binName : binNames)
+ synchronized (connectionBins)
{
- ConnectionBin bin;
- synchronized (connectionBins)
+ for (String binName : binNames)
{
- bin = connectionBins.get(binName);
+ ConnectionBin bin = connectionBins.get(binName);
+ if (bin != null)
+ bin.noteConnectionDestruction();
}
- if (bin != null)
- bin.noteConnectionDestruction();
}
}
@@ -453,32 +452,104 @@ public class Throttler
*@param currentTime is the current time, in ms. since epoch.
*@return the stream throttler to use to throttle the actual data access, or null if the system is being shut down.
*/
- public IStreamThrottler obtainFetchDocumentPermission(String[] binNames, long currentTime)
+ public IStreamThrottler obtainFetchDocumentPermission(String[] binNames)
throws InterruptedException
{
- // MHL
+ // First, make sure all the bins exist, and reserve a slot in each
+ int i = 0;
+ while (i < binNames.length)
+ {
+ String binName = binNames[i];
+ FetchBin bin;
+ synchronized (fetchBins)
+ {
+ bin = fetchBins.get(binName);
+ if (bin == null)
+ {
+ bin = new FetchBin(binName);
+ fetchBins.put(binName, bin);
+ }
+ }
+ // Reserve a slot
+ if (!bin.reserveFetchRequest())
+ {
+ // Release previous reservations, and return null
+ while (i > 0)
+ {
+ i--;
+ binName = binNames[i];
+ synchronized (fetchBins)
+ {
+ bin = fetchBins.get(binName);
+ }
+ if (bin != null)
+ bin.clearReservation();
+ }
+ return null;
+ }
+ i++;
+ }
+
+ // All reservations have been made! Convert them.
+ // (These are guaranteed to succeed - but they may wait)
+ i = 0;
+ while (i < binNames.length)
+ {
+ String binName = binNames[i];
+ FetchBin bin;
+ synchronized (fetchBins)
+ {
+ bin = fetchBins.get(binName);
+ }
+ if (bin != null)
+ {
+ if (!bin.waitNextFetch())
+ {
+ // Undo the reservations we haven't processed yet
+ while (i < binNames.length)
+ {
+ binName = binNames[i];
+ synchronized (fetchBins)
+ {
+ bin = fetchBins.get(binName);
+ }
+ if (bin != null)
+ bin.clearReservation();
+ i++;
+ }
+ return null;
+ }
+ }
+ i++;
+ }
+
+ // Do a "begin fetch" for all throttle bins
+ synchronized (throttleBins)
+ {
+ for (String binName : binNames)
+ {
+ ThrottleBin bin = throttleBins.get(binName);
+ if (bin == null)
+ {
+ bin = new ThrottleBin(binName);
+ throttleBins.put(binName,bin);
+ }
+ bin.beginFetch();
+ }
+ }
+
return new StreamThrottler(this, binNames);
}
- /** Release permission to fetch a document. Call this only when you
- * called obtainFetchDocumentPermission() successfully earlier.
- *@param currentTime is the current time, in ms. since epoch.
- */
- public void releaseFetchDocumentPermission(String[] binNames, long currentTime)
- {
- // MHL
- }
-
// IStreamThrottler support methods
/** Obtain permission to read a block of bytes. This method may wait until it is OK to proceed.
* The throttle group, bin names, etc are already known
* to this specific interface object, so it is unnecessary to include them here.
- *@param currentTime is the current time, in ms. since epoch.
*@param byteCount is the number of bytes to get permissions to read.
*@return true if the wait took place as planned, or false if the system is being shut down.
*/
- public boolean obtainReadPermission(String[] binNames, long currentTime, int byteCount)
+ public boolean obtainReadPermission(String[] binNames, int byteCount)
throws InterruptedException
{
// MHL
@@ -487,15 +558,29 @@ public class Throttler
/** Note the completion of the read of a block of bytes. Call this after
* obtainReadPermission() was successfully called, and bytes were successfully read.
- *@param currentTime is the current time, in ms. since epoch.
*@param origByteCount is the originally requested number of bytes to get permissions to read.
*@param actualByteCount is the number of bytes actually read.
*/
- public void releaseReadPermission(String[] binNames, long currentTime, int origByteCount, int actualByteCount)
+ public void releaseReadPermission(String[] binNames, int origByteCount, int actualByteCount)
{
// MHL
}
+ /** Note the stream being closed.
+ */
+ public void closeStream(String[] binNames)
+ {
+ synchronized (throttleBins)
+ {
+ for (String binName : binNames)
+ {
+ ThrottleBin bin = throttleBins.get(binName);
+ if (bin != null)
+ bin.endFetch();
+ }
+ }
+ }
+
// Bookkeeping methods
/** Call this periodically.
@@ -657,26 +742,15 @@ public class Throttler
* granted permission that created this object. When done (or aborting), call
* releaseFetchDocumentPermission() to note the completion of the document
* fetch activity.
- *@param currentTime is the current time, in ms. since epoch.
*@return the stream throttler to use to throttle the actual data access, or null if the system is being shut down.
*/
@Override
- public IStreamThrottler obtainFetchDocumentPermission(long currentTime)
+ public IStreamThrottler obtainFetchDocumentPermission()
throws InterruptedException
{
- return parent.obtainFetchDocumentPermission(binNames, currentTime);
+ return parent.obtainFetchDocumentPermission(binNames);
}
- /** Release permission to fetch a document. Call this only when you
- * called obtainFetchDocumentPermission() successfully earlier.
- *@param currentTime is the current time, in ms. since epoch.
- */
- @Override
- public void releaseFetchDocumentPermission(long currentTime)
- {
- parent.releaseFetchDocumentPermission(binNames, currentTime);
- }
-
}
/** Stream throttler implementation class.
@@ -696,27 +770,33 @@ public class Throttler
/** Obtain permission to read a block of bytes. This method may wait until it is OK to proceed.
* The throttle group, bin names, etc are already known
* to this specific interface object, so it is unnecessary to include them here.
- *@param currentTime is the current time, in ms. since epoch.
*@param byteCount is the number of bytes to get permissions to read.
*@return true if the wait took place as planned, or false if the system is being shut down.
*/
@Override
- public boolean obtainReadPermission(long currentTime, int byteCount)
+ public boolean obtainReadPermission(int byteCount)
throws InterruptedException
{
- return parent.obtainReadPermission(binNames, currentTime, byteCount);
+ return parent.obtainReadPermission(binNames, byteCount);
}
/** Note the completion of the read of a block of bytes. Call this after
* obtainReadPermission() was successfully called, and bytes were successfully read.
- *@param currentTime is the current time, in ms. since epoch.
*@param origByteCount is the originally requested number of bytes to get permissions to read.
*@param actualByteCount is the number of bytes actually read.
*/
@Override
- public void releaseReadPermission(long currentTime, int origByteCount, int actualByteCount)
+ public void releaseReadPermission(int origByteCount, int actualByteCount)
+ {
+ parent.releaseReadPermission(binNames, origByteCount, actualByteCount);
+ }
+
+ /** Note the stream being closed.
+ */
+ @Override
+ public void closeStream()
{
- parent.releaseReadPermission(binNames, currentTime, origByteCount, actualByteCount);
+ parent.closeStream(binNames);
}
}