You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/09 10:29:58 UTC
svn commit: r1549520 - in /manifoldcf/trunk: CHANGES.txt
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
Author: kwright
Date: Mon Dec 9 09:29:58 2013
New Revision: 1549520
URL: http://svn.apache.org/r1549520
Log:
Fix for CONNECTORS-830
Modified:
manifoldcf/trunk/CHANGES.txt
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1549520&r1=1549519&r2=1549520&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Mon Dec 9 09:29:58 2013
@@ -3,6 +3,10 @@ $Id$
======================= 1.5-dev =====================
+CONNECTORS-830: Stuffer thread throttling now works globally, not
+just locally.
+(Karl Wright)
+
CONNECTORS-781: Add support for multiple agents processes. This
ticket involved changing much of the underlying infrastructure for
the framework, including adding lock manager features, revamping the
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java?rev=1549520&r1=1549519&r2=1549520&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java Mon Dec 9 09:29:58 2013
@@ -32,6 +32,12 @@ public class StufferThread extends Threa
{
public static final String _rcsid = "@(#)$Id: StufferThread.java 988245 2010-08-23 18:39:35Z kwright $";
+ /** Write lock which allows us to keep track of the last time ANY stuffer thread stuffed data */
+ protected final static String stufferThreadLockName = "_STUFFERTHREAD_LOCK";
+ /** Datum which contains the last time, in milliseconds since epoch, that any stuffer thread in the cluster
+ successfully fired. */
+ protected final static String stufferThreadLastTimeDatumName = "_STUFFERTHREAD_LASTTIME";
+
// Local data
/** This is a reference to the static main document queue */
@@ -87,16 +93,13 @@ public class StufferThread extends Threa
IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
+ ILockManager lockManager = LockManagerFactory.make(threadContext);
ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
Logging.threads.debug("Stuffer thread: Low water mark is "+Integer.toString(lowWaterMark)+"; amount per stuffing is "+Integer.toString(stuffAmt));
- // This is used to adjust the number of records returned for jobs
- // that are throttled.
- long lastTime = System.currentTimeMillis();
-
// Hashmap keyed by jobid and containing ArrayLists.
// This way we can guarantee priority will do the right thing, because the
// priority is per-job. We CANNOT guarantee anything about scheduling order, however,
@@ -158,21 +161,40 @@ public class StufferThread extends Threa
// What we want to do is load enough documents to completely fill n queued document sets.
// The number n passed in here thus cannot be used in a query to limit the number of returned
// results. Instead, it must be factored into the limit portion of the query.
+
+ // Note well: the stuffer code stuffs based on intervals, so it is perfectly OK to
+ // compute the interval for this request AND update the global "last time" even
+ // before actually firing off the query. The worst that can happen is if the query
+ // fails, the interval will be "lost", and thus fewer documents will be stuffed than could
+ // be.
+ long stuffingStartTime;
+ long stuffingEndTime;
+ lockManager.enterWriteLock(stufferThreadLockName);
+ try
+ {
+ stuffingStartTime = readLastTime(lockManager);
+ stuffingEndTime = System.currentTimeMillis();
+ // Set the last time to be the current time
+ writeLastTime(lockManager,stuffingEndTime);
+ }
+ finally
+ {
+ lockManager.leaveWriteLock(stufferThreadLockName);
+ }
+
+ lastQueueStart = System.currentTimeMillis();
DepthStatistics depthStatistics = new DepthStatistics();
- long currentTime = System.currentTimeMillis();
- lastQueueStart = currentTime;
- DocumentDescription[] descs = jobManager.getNextDocuments(processID,stuffAmt,currentTime,currentTime-lastTime,
+ DocumentDescription[] descs = jobManager.getNextDocuments(processID,stuffAmt,stuffingEndTime,stuffingEndTime-stuffingStartTime,
blockingDocuments,queueTracker.getCurrentStatistics(),depthStatistics);
lastQueueEnd = System.currentTimeMillis();
lastQueueFullResults = (descs.length == stuffAmt);
+
+ // Assess what we've done.
+ rt.assessMinimumDepth(depthStatistics.getBins());
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
- rt.assessMinimumDepth(depthStatistics.getBins());
-
- // Set the last time to be the current time
- lastTime = currentTime;
if (Logging.threads.isDebugEnabled())
{
Logging.threads.debug("Stuffer thread: Found "+Integer.toString(descs.length)+" documents to queue");
@@ -407,4 +429,36 @@ public class StufferThread extends Threa
}
}
+ protected static long readLastTime(ILockManager lockManager)
+ throws ManifoldCFException
+ {
+ byte[] data = lockManager.readData(stufferThreadLastTimeDatumName);
+ if (data == null || data.length != 8)
+ return System.currentTimeMillis();
+ long value = ((long)data[0]) & 0xffL +
+ (((long)data[1]) << 8) & 0xff00L +
+ (((long)data[2]) << 16) & 0xff0000L +
+ (((long)data[3]) << 24) & 0xff000000L +
+ (((long)data[4]) << 32) & 0xff00000000L +
+ (((long)data[5]) << 40) & 0xff0000000000L +
+ (((long)data[6]) << 48) & 0xff000000000000L +
+ (((long)data[7]) << 56) & 0xff00000000000000L;
+ return value;
+ }
+
+ protected static void writeLastTime(ILockManager lockManager, long lastTime)
+ throws ManifoldCFException
+ {
+ byte[] data = new byte[8];
+ data[0] = (byte)(lastTime & 0xffL);
+ data[1] = (byte)((lastTime >> 8) & 0xffL);
+ data[2] = (byte)((lastTime >> 16) & 0xffL);
+ data[3] = (byte)((lastTime >> 24) & 0xffL);
+ data[4] = (byte)((lastTime >> 32) & 0xffL);
+ data[5] = (byte)((lastTime >> 40) & 0xffL);
+ data[6] = (byte)((lastTime >> 48) & 0xffL);
+ data[7] = (byte)((lastTime >> 56) & 0xffL);
+ lockManager.writeData(stufferThreadLastTimeDatumName,data);
+ }
+
}