You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2013/12/09 10:29:58 UTC

svn commit: r1549520 - in /manifoldcf/trunk: CHANGES.txt framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java

Author: kwright
Date: Mon Dec  9 09:29:58 2013
New Revision: 1549520

URL: http://svn.apache.org/r1549520
Log:
Fix for CONNECTORS-830

Modified:
    manifoldcf/trunk/CHANGES.txt
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java

Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1549520&r1=1549519&r2=1549520&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Mon Dec  9 09:29:58 2013
@@ -3,6 +3,10 @@ $Id$
 
 ======================= 1.5-dev =====================
 
+CONNECTORS-830: Stuffer thread throttling now works globally, not
+just locally.
+(Karl Wright)
+
 CONNECTORS-781: Add support for multiple agents processes.  This
 ticket involved changing much of the underlying infrastructure for
 the framework, including adding lock manager features, revamping the

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java?rev=1549520&r1=1549519&r2=1549520&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java Mon Dec  9 09:29:58 2013
@@ -32,6 +32,12 @@ public class StufferThread extends Threa
 {
   public static final String _rcsid = "@(#)$Id: StufferThread.java 988245 2010-08-23 18:39:35Z kwright $";
 
+  /** Write lock which allows us to keep track of the last time ANY stuffer thread stuffed data */
+  protected final static String stufferThreadLockName = "_STUFFERTHREAD_LOCK";
+  /** Datum which contains the last time, in milliseconds since epoch, that any stuffer thread in the cluster
+      successfully fired. */
+  protected final static String stufferThreadLastTimeDatumName = "_STUFFERTHREAD_LASTTIME";
+  
   // Local data
   
   /** This is a reference to the static main document queue */
@@ -87,16 +93,13 @@ public class StufferThread extends Threa
       IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
       IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
       IJobManager jobManager = JobManagerFactory.make(threadContext);
+      ILockManager lockManager = LockManagerFactory.make(threadContext);
       ReprioritizationTracker rt = new ReprioritizationTracker(threadContext);
 
       IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
       
       Logging.threads.debug("Stuffer thread: Low water mark is "+Integer.toString(lowWaterMark)+"; amount per stuffing is "+Integer.toString(stuffAmt));
 
-      // This is used to adjust the number of records returned for jobs
-      // that are throttled.
-      long lastTime = System.currentTimeMillis();
-
       // Hashmap keyed by jobid and containing ArrayLists.
       // This way we can guarantee priority will do the right thing, because the
       // priority is per-job.  We CANNOT guarantee anything about scheduling order, however,
@@ -158,21 +161,40 @@ public class StufferThread extends Threa
           // What we want to do is load enough documents to completely fill n queued document sets.
           // The number n passed in here thus cannot be used in a query to limit the number of returned
           // results.  Instead, it must be factored into the limit portion of the query.
+          
+          // Note well: the stuffer code stuffs based on intervals, so it is perfectly OK to 
+          // compute the interval for this request AND update the global "last time" even
+          // before actually firing off the query.  The worst that can happen is if the query
+          // fails, the interval will be "lost", and thus fewer documents will be stuffed than could
+          // be.
+          long stuffingStartTime;
+          long stuffingEndTime;
+          lockManager.enterWriteLock(stufferThreadLockName);
+          try
+          {
+            stuffingStartTime = readLastTime(lockManager);
+            stuffingEndTime = System.currentTimeMillis();
+            // Set the last time to be the current time
+            writeLastTime(lockManager,stuffingEndTime);
+          }
+          finally
+          {
+            lockManager.leaveWriteLock(stufferThreadLockName);
+          }
+
+          lastQueueStart = System.currentTimeMillis();
           DepthStatistics depthStatistics = new DepthStatistics();
-          long currentTime = System.currentTimeMillis();
-          lastQueueStart = currentTime;
-          DocumentDescription[] descs = jobManager.getNextDocuments(processID,stuffAmt,currentTime,currentTime-lastTime,
+          DocumentDescription[] descs = jobManager.getNextDocuments(processID,stuffAmt,stuffingEndTime,stuffingEndTime-stuffingStartTime,
             blockingDocuments,queueTracker.getCurrentStatistics(),depthStatistics);
           lastQueueEnd = System.currentTimeMillis();
           lastQueueFullResults = (descs.length == stuffAmt);
+          
+          // Assess what we've done.
+          rt.assessMinimumDepth(depthStatistics.getBins());
 
           if (Thread.currentThread().isInterrupted())
             throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
 
-          rt.assessMinimumDepth(depthStatistics.getBins());
-
-          // Set the last time to be the current time
-          lastTime = currentTime;
           if (Logging.threads.isDebugEnabled())
           {
             Logging.threads.debug("Stuffer thread: Found "+Integer.toString(descs.length)+" documents to queue");
@@ -407,4 +429,36 @@ public class StufferThread extends Threa
     }
   }
 
+  protected static long readLastTime(ILockManager lockManager)
+    throws ManifoldCFException
+  {
+    byte[] data = lockManager.readData(stufferThreadLastTimeDatumName);
+    if (data == null || data.length != 8)
+      return System.currentTimeMillis();
+    long value = ((long)data[0]) & 0xffL +
+      (((long)data[1]) << 8) & 0xff00L +
+      (((long)data[2]) << 16) & 0xff0000L +
+      (((long)data[3]) << 24) & 0xff000000L +
+      (((long)data[4]) << 32) & 0xff00000000L +
+      (((long)data[5]) << 40) & 0xff0000000000L +
+      (((long)data[6]) << 48) & 0xff000000000000L +
+      (((long)data[7]) << 56) & 0xff00000000000000L;
+    return value;
+  }
+
+  protected static void writeLastTime(ILockManager lockManager, long lastTime)
+    throws ManifoldCFException
+  {
+    byte[] data = new byte[8];
+    data[0] = (byte)(lastTime & 0xffL);
+    data[1] = (byte)((lastTime >> 8) & 0xffL);
+    data[2] = (byte)((lastTime >> 16) & 0xffL);
+    data[3] = (byte)((lastTime >> 24) & 0xffL);
+    data[4] = (byte)((lastTime >> 32) & 0xffL);
+    data[5] = (byte)((lastTime >> 40) & 0xffL);
+    data[6] = (byte)((lastTime >> 48) & 0xffL);
+    data[7] = (byte)((lastTime >> 56) & 0xffL);
+    lockManager.writeData(stufferThreadLastTimeDatumName,data);
+  }
+  
 }