You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/11/10 14:00:51 UTC

svn commit: r1637833 - in /manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler: interfaces/ jobs/ system/

Author: kwright
Date: Mon Nov 10 13:00:51 2014
New Revision: 1637833

URL: http://svn.apache.org/r1637833
Log:
Take out priorityset field, and get everything to build again.

Modified:
    manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
    manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
    manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
    manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java
    manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
    manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Mon Nov 10 13:00:51 2014
@@ -217,21 +217,18 @@ public interface IJobManager
 
   /** Get a list of not-yet-processed documents to reprioritize.  Documents in all jobs will be
   * returned by this method.  Up to n document descriptions will be returned.
-  *@param currentTime is the current time stamp for this prioritization pass.  Avoid
-  *  picking up any documents that are labeled with this timestamp or after.
   *@param n is the maximum number of document descriptions desired.
   *@return the document descriptions.
   */
-  public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(long currentTime, int n)
+  public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(int n)
     throws ManifoldCFException;
 
   /** Save a set of document priorities.  In the case where a document was eligible to have its
   * priority set, but it no longer is eligible, then the provided priority will not be written.
-  *@param currentTime is the time in milliseconds since epoch.
   *@param descriptions are the document descriptions.
   *@param priorities are the desired priorities.
   */
-  public void writeDocumentPriorities(long currentTime, DocumentDescription[] descriptions, IPriorityCalculator[] priorities)
+  public void writeDocumentPriorities(DocumentDescription[] descriptions, IPriorityCalculator[] priorities)
     throws ManifoldCFException;
 
   // This method supports the "expiration" thread
@@ -414,7 +411,7 @@ public interface IJobManager
   *@param documentDescriptions is the set of description objects for the documents that have had their parent carrydown information changed.
   *@param docPriorities are the document priorities to assign to the documents, if needed.
   */
-  public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, IPriorityCalculator[] docPriorities)
+  public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, IPriorityCalculator[] docPriorities)
     throws ManifoldCFException;
 
   /** Requeue a document because of carrydown changes.
@@ -423,7 +420,7 @@ public interface IJobManager
   *@param documentDescription is the description object for the document that has had its parent carrydown information changed.
   *@param docPriority is the document priority to assign to the document, if needed.
   */
-  public void carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, IPriorityCalculator docPriority)
+  public void carrydownChangeDocument(DocumentDescription documentDescription, IPriorityCalculator docPriority)
     throws ManifoldCFException;
 
   /** Requeue a document for further processing in the future.
@@ -558,14 +555,13 @@ public interface IJobManager
   *@param docIDs are the local document identifiers.
   *@param overrideSchedule is true if any existing document schedule should be overridden.
   *@param hopcountMethod is either accurate, nodelete, or neverdelete.
-  *@param currentTime is the current time in milliseconds since epoch.
   *@param documentPriorities are the document priorities corresponding to the document identifiers.
   *@param prereqEventNames are the events that must be completed before each document can be processed.
   */
   public void addDocumentsInitial(String processID,
     Long jobID, String[] legalLinkTypes,
     String[] docIDHashes, String[] docIDs, boolean overrideSchedule,
-    int hopcountMethod, long currentTime, IPriorityCalculator[] documentPriorities,
+    int hopcountMethod, IPriorityCalculator[] documentPriorities,
     String[][] prereqEventNames)
     throws ManifoldCFException;
 
@@ -650,7 +646,6 @@ public interface IJobManager
   *@param dataNames are the names of the data to carry down to the child from this parent.
   *@param dataValues are the values to carry down to the child from this parent, corresponding to dataNames above.  If CharacterInput objects are passed in here,
   *       it is the caller's responsibility to clean these up.
-  *@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
   *@param priority is the desired document priority for the document.
   *@param prereqEventNames are the events that must be completed before the document can be processed.
   */
@@ -660,7 +655,7 @@ public interface IJobManager
     String parentIdentifierHash,
     String relationshipType,
     int hopcountMethod, String[] dataNames, Object[][] dataValues,
-    long currentTime, IPriorityCalculator priority, String[] prereqEventNames)
+    IPriorityCalculator priority, String[] prereqEventNames)
     throws ManifoldCFException;
 
   /** Add documents to the queue in bulk.
@@ -680,7 +675,6 @@ public interface IJobManager
   *@param dataNames are the names of the data to carry down to the child from this parent.
   *@param dataValues are the values to carry down to the child from this parent, corresponding to dataNames above.  If CharacterInput objects are passed in here,
   *       it is the caller's responsibility to clean these up.
-  *@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
   *@param priorities are the desired document priorities for the documents.
   *@param prereqEventNames are the events that must be completed before each document can be processed.
   */
@@ -690,7 +684,7 @@ public interface IJobManager
     String parentIdentifierHash,
     String relationshipType,
     int hopcountMethod, String[][] dataNames, Object[][][] dataValues,
-    long currentTime, IPriorityCalculator[] priorities,
+    IPriorityCalculator[] priorities,
     String[][] prereqEventNames)
     throws ManifoldCFException;
 

Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Nov 10 13:00:51 2014
@@ -2016,13 +2016,11 @@ public class JobManager implements IJobM
 
   /** Get a list of not-yet-processed documents to reprioritize.  Documents in all jobs will be
   * returned by this method.  Up to n document descriptions will be returned.
-  *@param currentTime is the current time stamp for this prioritization pass.  Avoid
-  *  picking up any documents that are labeled with this timestamp or after.
   *@param n is the maximum number of document descriptions desired.
   *@return the document descriptions.
   */
   @Override
-  public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(long currentTime, int n)
+  public DocumentDescription[] getNextNotYetProcessedReprioritizationDocuments(int n)
     throws ManifoldCFException
   {
     StringBuilder sb = new StringBuilder("SELECT ");
@@ -2034,41 +2032,10 @@ public class JobManager implements IJobM
       .append(jobQueue.docHashField).append(",")
       .append(jobQueue.docIDField).append(",")
       .append(jobQueue.jobIDField)
-      .append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ")
-      .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-        new MultiClause(jobQueue.statusField,new Object[]{
-          JobQueue.statusToString(jobQueue.STATUS_HOPCOUNTREMOVED),
-          JobQueue.statusToString(jobQueue.STATUS_PENDING),
-          JobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY)}),
-        new UnitaryClause(jobQueue.prioritySetField,"<",new Long(currentTime))}));
-    
-    sb.append(" AND ")
-      .append(jobQueue.checkActionField).append("=?");
-    list.add(jobQueue.actionToString(JobQueue.ACTION_RESCAN));
-
-    // Per CONNECTORS-290, we need to be leaving priorities blank for jobs that aren't using them,
-    // so this will be changed to not include jobs where the priorities have been bashed to null.
-    //
-    // I've included ALL states that might have non-null doc priorities.  This includes states
-    // corresponding to uninstalled connectors, since there is no transition that cleans out the
-    // document priorities in these states.  The time during which a connector is uninstalled is
-    // expected to be short, because typically this state is the result of an installation procedure
-    // rather than willful action on the part of a user.
-        
-    sb.append(" AND EXISTS(SELECT 'x' FROM ").append(jobs.getTableName()).append(" t1 WHERE ")
+      .append(" FROM ").append(jobQueue.getTableName()).append(" WHERE ")
       .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-        new MultiClause("t1."+jobs.statusField,new Object[]{
-          Jobs.statusToString(Jobs.STATUS_STARTINGUP),
-          Jobs.statusToString(Jobs.STATUS_STARTINGUPMINIMAL),
-          Jobs.statusToString(Jobs.STATUS_ACTIVE),
-          Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING),
-          Jobs.statusToString(Jobs.STATUS_ACTIVE_UNINSTALLED),
-          Jobs.statusToString(Jobs.STATUS_ACTIVESEEDING_UNINSTALLED)
-          }),
-        new JoinClause("t1."+jobs.idField,"t0."+jobQueue.jobIDField)}))
-      .append(") ");
-
-    sb.append(database.constructOffsetLimitClause(0,n));
+        new UnitaryClause(jobQueue.needPriorityField,jobQueue.needPriorityToString(true))})).append(" ")
+      .append(database.constructOffsetLimitClause(0,n));
 
     // Analyze jobqueue tables unconditionally, since it's become much more sensitive in 8.3 than it used to be.
     //jobQueue.unconditionallyAnalyzeTables();
@@ -2094,12 +2061,11 @@ public class JobManager implements IJobM
 
   /** Save a set of document priorities.  In the case where a document was eligible to have its
   * priority set, but it no longer is eligible, then the provided priority will not be written.
-  *@param currentTime is the time in milliseconds since epoch.
   *@param documentDescriptions are the document descriptions.
   *@param priorities are the desired priorities.
   */
   @Override
-  public void writeDocumentPriorities(long currentTime, DocumentDescription[] documentDescriptions, IPriorityCalculator[] priorities)
+  public void writeDocumentPriorities(DocumentDescription[] documentDescriptions, IPriorityCalculator[] priorities)
     throws ManifoldCFException
   {
 
@@ -2107,16 +2073,14 @@ public class JobManager implements IJobM
     while (true)
     {
       // This should be ordered by document identifier hash in order to prevent potential deadlock conditions
-      HashMap indexMap = new HashMap();
+      Map<String,Integer> indexMap = new HashMap<String,Integer>();
       String[] docIDHashes = new String[documentDescriptions.length];
 
-      int i = 0;
-      while (i < documentDescriptions.length)
+      for (int i = 0; i < documentDescriptions.length; i++)
       {
         String documentIDHash = documentDescriptions[i].getDocumentIdentifierHash() + ":"+documentDescriptions[i].getJobID();
         docIDHashes[i] = documentIDHash;
         indexMap.put(documentIDHash,new Integer(i));
-        i++;
       }
 
       java.util.Arrays.sort(docIDHashes);
@@ -2129,18 +2093,16 @@ public class JobManager implements IJobM
       {
 
         // Need to order the writes by doc id.
-        i = 0;
-        while (i < docIDHashes.length)
+        for (int i = 0; i < docIDHashes.length; i++)
         {
           String docIDHash = docIDHashes[i];
-          Integer x = (Integer)indexMap.remove(docIDHash);
+          Integer x = indexMap.remove(docIDHash);
           if (x == null)
             throw new ManifoldCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
           int index = x.intValue();
           DocumentDescription dd = documentDescriptions[index];
           IPriorityCalculator priority = priorities[index];
-          jobQueue.writeDocPriority(currentTime,dd.getID(),priority);
-          i++;
+          jobQueue.writeDocPriority(dd.getID(),priority);
         }
         database.performCommit();
         break;
@@ -2470,10 +2432,7 @@ public class JobManager implements IJobM
     // But, after I did this, it was no longer necessary to have such a large transaction either.
 
 
-    // Anything older than 10 minutes ago is considered eligible for reprioritization.
-    long prioritizationTime = currentTime - 60000L * 10L;
-
-    ThrottleLimit vList = new ThrottleLimit(n,prioritizationTime);
+    ThrottleLimit vList = new ThrottleLimit(n);
 
     IResultSet jobconnections = jobs.getActiveJobConnections();
     HashMap connectionSet = new HashMap();
@@ -2755,8 +2714,7 @@ public class JobManager implements IJobM
       .append(jobQueue.docIDField).append(",t0.")
       .append(jobQueue.statusField).append(",t0.")
       .append(jobQueue.failTimeField).append(",t0.")
-      .append(jobQueue.failCountField).append(",t0.")
-      .append(jobQueue.prioritySetField).append(" FROM ").append(jobQueue.getTableName())
+      .append(jobQueue.failCountField).append(" FROM ").append(jobQueue.getTableName())
       .append(" t0 ").append(jobQueue.getGetNextDocumentsIndexHint()).append(" WHERE ");
     
     sb.append(database.buildConjunctionClause(list,new ClauseDescription[]{
@@ -4296,14 +4254,13 @@ public class JobManager implements IJobM
   *@param docIDs are the local document identifiers.
   *@param overrideSchedule is true if any existing document schedule should be overridden.
   *@param hopcountMethod is either accurate, nodelete, or neverdelete.
-  *@param currentTime is the current time in milliseconds since epoch.
   *@param documentPriorities are the document priorities corresponding to the document identifiers.
   *@param prereqEventNames are the events that must be completed before each document can be processed.
   */
   @Override
   public void addDocumentsInitial(String processID, Long jobID, String[] legalLinkTypes,
     String[] docIDHashes, String[] docIDs, boolean overrideSchedule,
-    int hopcountMethod, long currentTime, IPriorityCalculator[] documentPriorities,
+    int hopcountMethod, IPriorityCalculator[] documentPriorities,
     String[][] prereqEventNames)
     throws ManifoldCFException
   {
@@ -4390,12 +4347,12 @@ public class JobManager implements IJobM
             int status = jobQueue.stringToStatus((String)row.getValue(jobQueue.statusField));
             Long checkTimeValue = (Long)row.getValue(jobQueue.checkTimeField);
 
-            jobQueue.updateExistingRecordInitial(rowID,status,checkTimeValue,executeTime,currentTime,docPriority,docPrereqs,processID);
+            jobQueue.updateExistingRecordInitial(rowID,status,checkTimeValue,executeTime,docPriority,docPrereqs,processID);
           }
           else
           {
             // Not found.  Attempt an insert instead.  This may fail due to constraints, but if this happens, the whole transaction will be retried.
-            jobQueue.insertNewRecordInitial(jobID,docIDHash,docID,docPriority,executeTime,currentTime,docPrereqs,processID);
+            jobQueue.insertNewRecordInitial(jobID,docIDHash,docID,docPriority,executeTime,docPrereqs,processID);
           }
 
           z++;
@@ -4821,7 +4778,7 @@ public class JobManager implements IJobM
     String[] docIDHashes, String[] docIDs,
     String parentIdentifierHash, String relationshipType,
     int hopcountMethod, String[][] dataNames, Object[][][] dataValues,
-    long currentTime, IPriorityCalculator[] documentPriorities,
+    IPriorityCalculator[] documentPriorities,
     String[][] prereqEventNames)
     throws ManifoldCFException
   {
@@ -4996,7 +4953,7 @@ public class JobManager implements IJobM
           else
           {
             // Not found.  Attempt an insert instead.  This may fail due to constraints, but if this happens, the whole transaction will be retried.
-            jobQueue.insertNewRecord(jobID,docIDHash,reorderedDocumentIdentifiers[z],reorderedDocumentPriorities[z],0L,currentTime,reorderedDocumentPrerequisites[z]);
+            jobQueue.insertNewRecord(jobID,docIDHash,reorderedDocumentIdentifiers[z],reorderedDocumentPriorities[z],0L,reorderedDocumentPrerequisites[z]);
           }
 
         }
@@ -5022,7 +4979,7 @@ public class JobManager implements IJobM
             // helps us determine whether we're going to need to "flip" HOPCOUNTREMOVED documents
             // to the PENDING state.  If the new link ended in an existing record, THEN we need to flip them all!
             jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
-              0L,currentTime,carrydownChangesSeen[z] || (hopcountChangesSeen!=null && hopcountChangesSeen[z]),
+              0L,carrydownChangesSeen[z] || (hopcountChangesSeen!=null && hopcountChangesSeen[z]),
               reorderedDocumentPriorities[z],reorderedDocumentPrerequisites[z]);
             // Signal if we need to perform the flip
             if (hopcountChangesSeen != null && hopcountChangesSeen[z])
@@ -5093,7 +5050,6 @@ public class JobManager implements IJobM
   *@param hopcountMethod is the desired method for managing hopcounts.
   *@param dataNames are the names of the data to carry down to the child from this parent.
   *@param dataValues are the values to carry down to the child from this parent, corresponding to dataNames above.
-  *@param currentTime is the time in milliseconds since epoch that will be recorded for this operation.
   *@param priority is the desired document priority for the document.
   *@param prereqEventNames are the events that must be completed before the document can be processed.
   */
@@ -5102,13 +5058,13 @@ public class JobManager implements IJobM
     Long jobID, String[] legalLinkTypes, String docIDHash, String docID,
     String parentIdentifierHash, String relationshipType,
     int hopcountMethod, String[] dataNames, Object[][] dataValues,
-    long currentTime, IPriorityCalculator priority, String[] prereqEventNames)
+    IPriorityCalculator priority, String[] prereqEventNames)
     throws ManifoldCFException
   {
     addDocuments(processID,jobID,legalLinkTypes,
       new String[]{docIDHash},new String[]{docID},
       parentIdentifierHash,relationshipType,hopcountMethod,new String[][]{dataNames},
-      new Object[][][]{dataValues},currentTime,new IPriorityCalculator[]{priority},new String[][]{prereqEventNames});
+      new Object[][][]{dataValues},new IPriorityCalculator[]{priority},new String[][]{prereqEventNames});
   }
 
   /** Undo the addition of child documents to the queue, for a set of documents.
@@ -5460,7 +5416,7 @@ public class JobManager implements IJobM
   *@param docPriorities are the document priorities to assign to the documents, if needed.
   */
   @Override
-  public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, long currentTime, IPriorityCalculator[] docPriorities)
+  public void carrydownChangeDocumentMultiple(DocumentDescription[] documentDescriptions, IPriorityCalculator[] docPriorities)
     throws ManifoldCFException
   {
     if (documentDescriptions.length == 0)
@@ -5540,7 +5496,7 @@ public class JobManager implements IJobM
           if (jr != null)
             // It was an existing row; do the update logic; use the 'carrydown changes' flag = true all the time.
             jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
-              0L,currentTime,true,docPriorities[originalIndex],null);
+              0L,true,docPriorities[originalIndex],null);
           j++;
         }
         database.performCommit();
@@ -5578,10 +5534,10 @@ public class JobManager implements IJobM
   *@param docPriority is the document priority to assign to the document, if needed.
   */
   @Override
-  public void carrydownChangeDocument(DocumentDescription documentDescription, long currentTime, IPriorityCalculator docPriority)
+  public void carrydownChangeDocument(DocumentDescription documentDescription, IPriorityCalculator docPriority)
     throws ManifoldCFException
   {
-    carrydownChangeDocumentMultiple(new DocumentDescription[]{documentDescription},currentTime,new IPriorityCalculator[]{docPriority});
+    carrydownChangeDocumentMultiple(new DocumentDescription[]{documentDescription},new IPriorityCalculator[]{docPriority});
   }
 
   /** Sleep a random amount of time after a transaction abort.
@@ -8151,6 +8107,7 @@ public class JobManager implements IJobM
         // All the job's documents need to have their docpriority set to null, to clear dead wood out of the docpriority index.
         // See CONNECTORS-290.
         // We do this BEFORE updating the job state.
+        
         jobQueue.clearDocPriorities(jobID);
             
         IJobDescription jobDesc = jobs.load(jobID,true);
@@ -9448,30 +9405,30 @@ public class JobManager implements IJobM
   {
     // For each connection, there is (a) a number (which is the maximum per bin), and (b)
     // a current running count per bin.  These are stored as elements in a hash map.
-    protected HashMap connectionMap = new HashMap();
+    protected Map<String,ThrottleJobItem> connectionMap = new HashMap<String,ThrottleJobItem>();
 
     // The maximum number of jobs that have reached their chunk size limit that we
     // need
-    protected int n;
+    protected final int n;
 
     // This is the hash table that maps a job ID to the object that tracks the number
     // of documents already accumulated for this resultset.  The count of the number
     // of queue records we have is tallied by going through each job in this table
     // and adding the records outstanding for it.
-    protected HashMap jobQueueHash = new HashMap();
+    protected final Map<Long,QueueHashItem> jobQueueHash = new HashMap<Long,QueueHashItem>();
 
     // This is the map from jobid to connection name
-    protected HashMap jobConnection = new HashMap();
+    protected Map<Long,String> jobConnection = new HashMap<Long,String>();
 
     // This is the set of allowed connection names.  We discard all documents that are
     // not from that set.
-    protected HashMap activeConnections = new HashMap();
+    protected Map<String,IRepositoryConnector> activeConnections = new HashMap<String,IRepositoryConnector>();
 
     // This is the number of documents per set per connection.
-    protected HashMap setSizes = new HashMap();
+    protected Map<String,Integer> setSizes = new HashMap<String,Integer>();
 
     // These are the individual connection maximums, keyed by connection name.
-    protected HashMap maxConnectionCounts = new HashMap();
+    protected Map<String,MutableInteger> maxConnectionCounts = new HashMap<String,MutableInteger>();
 
     // This is the maximum number of documents per set over all the connections we are looking at.  This helps us establish a sanity limit.
     protected int maxSetSize = 0;
@@ -9480,19 +9437,15 @@ public class JobManager implements IJobM
     protected int documentsProcessed = 0;
 
     // This is where we accumulate blocking documents.  This is an arraylist of DocumentDescription objects.
-    protected ArrayList blockingDocumentArray = new ArrayList();
-
-    // Cutoff time for documents eligible for prioritization
-    protected long prioritizationTime;
+    protected final List<DocumentDescription> blockingDocumentArray = new ArrayList<DocumentDescription>();
 
     /** Constructor.
     * This class is built up piecemeal, so the constructor does nothing.
     *@param n is the maximum number of full job descriptions we want at this time.
     */
-    public ThrottleLimit(int n, long prioritizationTime)
+    public ThrottleLimit(int n)
     {
       this.n = n;
-      this.prioritizationTime = prioritizationTime;
       Logging.perf.debug("Limit instance created");
     }
 
@@ -9541,7 +9494,7 @@ public class JobManager implements IJobM
       if (Logging.perf.isDebugEnabled())
         Logging.perf.debug(" Adding fetch limit of "+Integer.toString(upperLimit)+" fetches for expression '"+regexp+"' for connection '"+connectionName+"'");
 
-      ThrottleJobItem ji = (ThrottleJobItem)connectionMap.get(connectionName);
+      ThrottleJobItem ji = connectionMap.get(connectionName);
       if (ji == null)
       {
         ji = new ThrottleJobItem();
@@ -9582,7 +9535,7 @@ public class JobManager implements IJobM
     /** Make a deep copy */
     public ThrottleLimit makeDeepCopy()
     {
-      ThrottleLimit rval = new ThrottleLimit(n,prioritizationTime);
+      ThrottleLimit rval = new ThrottleLimit(n);
       // Create a true copy of all the structures in which counts are kept.  The referential structures (e.g. connection hashes)
       // do not need a deep copy.
       rval.activeConnections = activeConnections;
@@ -9592,18 +9545,13 @@ public class JobManager implements IJobM
       rval.jobConnection = jobConnection;
       // The structures where counts are maintained DO need a deep copy.
       rval.documentsProcessed = documentsProcessed;
-      Iterator iter;
-      iter = connectionMap.keySet().iterator();
-      while (iter.hasNext())
+      for (String key : connectionMap.keySet())
       {
-        Object key = iter.next();
-        rval.connectionMap.put(key,((ThrottleJobItem)connectionMap.get(key)).duplicate());
+        rval.connectionMap.put(key,connectionMap.get(key).duplicate());
       }
-      iter = jobQueueHash.keySet().iterator();
-      while (iter.hasNext())
+      for (Long key : jobQueueHash.keySet())
       {
-        Object key = iter.next();
-        rval.jobQueueHash.put(key,((QueueHashItem)jobQueueHash.get(key)).duplicate());
+        rval.jobQueueHash.put(key,jobQueueHash.get(key).duplicate());
       }
       return rval;
     }
@@ -9651,13 +9599,13 @@ public class JobManager implements IJobM
       Long jobIDValue = (Long)row.getValue(JobQueue.jobIDField);
 
       // Get the connection name for this row
-      String connectionName = (String)jobConnection.get(jobIDValue);
+      String connectionName = jobConnection.get(jobIDValue);
       if (connectionName == null)
       {
         Logging.perf.debug(" Row does not have an eligible job - excluding");
         return false;
       }
-      IRepositoryConnector connectorInstance = (IRepositoryConnector)activeConnections.get(connectionName);
+      IRepositoryConnector connectorInstance = activeConnections.get(connectionName);
       if (connectorInstance == null)
       {
         Logging.perf.debug(" Row does not have an eligible connector instance - excluding");
@@ -9665,7 +9613,7 @@ public class JobManager implements IJobM
       }
 
       // Find the connection limit for this document
-      MutableInteger connectionLimit = (MutableInteger)maxConnectionCounts.get(connectionName);
+      MutableInteger connectionLimit = maxConnectionCounts.get(connectionName);
       if (connectionLimit != null)
       {
         if (connectionLimit.intValue() == 0)
@@ -9677,11 +9625,11 @@ public class JobManager implements IJobM
       }
 
       // Tally this item in the job queue hash, so we can detect when to stop
-      QueueHashItem queueItem = (QueueHashItem)jobQueueHash.get(jobIDValue);
+      QueueHashItem queueItem = jobQueueHash.get(jobIDValue);
       if (queueItem == null)
       {
         // Need to talk to the connector to get a max number of docs per chunk
-        int maxCount = ((Integer)setSizes.get(connectionName)).intValue();
+        int maxCount = setSizes.get(connectionName).intValue();
         queueItem = new QueueHashItem(maxCount);
         jobQueueHash.put(jobIDValue,queueItem);
 
@@ -9697,7 +9645,7 @@ public class JobManager implements IJobM
       documentsProcessed++;
       //scanRecord.addBins(binNames);
 
-      ThrottleJobItem item = (ThrottleJobItem)connectionMap.get(connectionName);
+      ThrottleJobItem item = connectionMap.get(connectionName);
 
       // If there is no schedule-based throttling on this connection, we're done.
       if (item == null)
@@ -9716,8 +9664,11 @@ public class JobManager implements IJobM
           if (Logging.perf.isDebugEnabled())
             Logging.perf.debug(" Bin "+binNames[j]+" has no more available fetches - excluding");
 
-          Object o = row.getValue(JobQueue.prioritySetField);
-          if (o == null || ((Long)o).longValue() <= prioritizationTime)
+          //???
+          //Object o = row.getValue(JobQueue.prioritySetField);
+          //if (o == null || ((Long)o).longValue() <= prioritizationTime)
+          // Fully enabling blocking document logic, for now.
+          if (true)
           {
             // Need to add a document descriptor based on this row to the blockingDocuments object!
             // This will cause it to be reprioritized preferentially, getting it out of the way if it shouldn't
@@ -9752,12 +9703,10 @@ public class JobManager implements IJobM
         return false;
 
       // If the number of chunks exceeds n, we are done
-      Iterator iter = jobQueueHash.keySet().iterator();
       int count = 0;
-      while (iter.hasNext())
+      for (Long jobID : jobQueueHash.keySet())
       {
-        Long jobID = (Long)iter.next();
-        QueueHashItem item = (QueueHashItem)jobQueueHash.get(jobID);
+        QueueHashItem item = jobQueueHash.get(jobID);
         count += item.getChunkCount();
         if (count > n)
           return false;
@@ -9774,7 +9723,7 @@ public class JobManager implements IJobM
   protected static class QueueHashItem
   {
     // The number of items per chunk for this job
-    int itemsPerChunk;
+    final int itemsPerChunk;
     // The number of chunks so far, INCLUDING incomplete chunks
     int chunkCount = 0;
     // The number of documents in the current incomplete chunk
@@ -9826,10 +9775,10 @@ public class JobManager implements IJobM
   protected static class ThrottleJobItem
   {
     /** These are the bin limits.  This is an array of ThrottleLimitSpec objects. */
-    protected ArrayList throttleLimits = new ArrayList();
+    protected List<ThrottleLimitSpec> throttleLimits = new ArrayList<ThrottleLimitSpec>();
     /** This is a map of the bins and their current counts. If an entry doesn't exist, it's considered to be
     * the same as maxBinCount. */
-    protected HashMap binCounts = new HashMap();
+    protected final Map<String,MutableInteger> binCounts = new HashMap<String,MutableInteger>();
 
     /** Constructor. */
     public ThrottleJobItem()
@@ -9859,11 +9808,9 @@ public class JobManager implements IJobM
     {
       ThrottleJobItem rval = new ThrottleJobItem();
       rval.throttleLimits = throttleLimits;
-      Iterator iter = binCounts.keySet().iterator();
-      while (iter.hasNext())
+      for (String key : binCounts.keySet())
       {
-        String key = (String)iter.next();
-        this.binCounts.put(key,((MutableInteger)binCounts.get(key)).duplicate());
+        this.binCounts.put(key,binCounts.get(key).duplicate());
       }
       return rval;
     }
@@ -9874,7 +9821,7 @@ public class JobManager implements IJobM
     */
     public boolean isEmpty(String binName)
     {
-      MutableInteger value = (MutableInteger)binCounts.get(binName);
+      MutableInteger value = binCounts.get(binName);
       int remaining;
       if (value == null)
       {
@@ -9893,7 +9840,7 @@ public class JobManager implements IJobM
     */
     public void decrement(String binName)
     {
-      MutableInteger value = (MutableInteger)binCounts.get(binName);
+      MutableInteger value = binCounts.get(binName);
       if (value == null)
       {
         int x = findMaxCount(binName);
@@ -9938,7 +9885,7 @@ public class JobManager implements IJobM
       int i = 0;
       while (i < throttleLimits.size())
       {
-        ThrottleLimitSpec spec = (ThrottleLimitSpec)throttleLimits.get(i++);
+        ThrottleLimitSpec spec = throttleLimits.get(i++);
         Pattern p = spec.getRegexp();
         Matcher m = p.matcher(binName);
         if (m.find())
@@ -9957,9 +9904,9 @@ public class JobManager implements IJobM
   protected static class ThrottleLimitSpec
   {
     /** Regexp */
-    protected Pattern regexp;
+    protected final Pattern regexp;
     /** The fetch limit for all bins matching that regexp */
-    protected int maxCount;
+    protected final int maxCount;
 
     /** Constructor */
     public ThrottleLimitSpec(String regexp, int maxCount)

Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Mon Nov 10 13:00:51 2014
@@ -41,10 +41,10 @@ import java.util.*;
  * <tr><td>status</td><td>CHAR(1)</td><td></td></tr>
  * <tr><td>isseed</td><td>CHAR(1)</td><td></td></tr>
  * <tr><td>docpriority</td><td>FLOAT</td><td></td></tr>
- * <tr><td>priorityset</td><td>BIGINT</td><td></td></tr>
  * <tr><td>checkaction</td><td>CHAR(1)</td><td></td></tr>
  * <tr><td>processid</td><td>VARCHAR(16)</td><td></td></tr>
  * <tr><td>seedingprocessid</td><td>VARCHAR(16)</td><td></td></tr>
+ * <tr><td>needpriority</td><td>CHAR(1)</td><td></td></tr>
  * </table>
  * <br><br>
  * 
@@ -114,10 +114,10 @@ public class JobQueue extends org.apache
   public static final String failCountField = "failcount";
   public static final String isSeedField = "isseed";
   public static final String docPriorityField = "docpriority";
-  public static final String prioritySetField = "priorityset";
   public static final String checkActionField = "checkaction";
   public static final String processIDField = "processid";
   public static final String seedingProcessIDField = "seedingprocessid";
+  public static final String needPriorityField = "needpriority";
   
   public static final double noDocPriorityValue = 1e9;
   public static final Double nullDocPriority = new Double(noDocPriorityValue + 1.0);
@@ -205,10 +205,10 @@ public class JobQueue extends org.apache
         map.put(statusField,new ColumnDescription("CHAR(1)",false,false,null,null,false));
         map.put(isSeedField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
         map.put(docPriorityField,new ColumnDescription("FLOAT",false,true,null,null,false));
-        map.put(prioritySetField,new ColumnDescription("BIGINT",false,false,null,null,false));
         map.put(checkActionField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
         map.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
         map.put(seedingProcessIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
+        map.put(needPriorityField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
         performCreate(map,null);
       }
       else
@@ -225,7 +225,7 @@ public class JobQueue extends org.apache
       IndexDescription jobSeedIndex = new IndexDescription(false,new String[]{isSeedField,jobIDField});
       IndexDescription failTimeIndex = new IndexDescription(false,new String[]{failTimeField,jobIDField});
       IndexDescription actionTimeStatusIndex = new IndexDescription(false,new String[]{statusField,checkActionField,checkTimeField});
-      IndexDescription prioritysetStatusIndex = new IndexDescription(false,new String[]{statusField,prioritySetField});
+      IndexDescription needPriorityIndex = new IndexDescription(false,new String[]{needPriorityField});
       // No evidence that the extra fields help at all, for any database...
       IndexDescription docpriorityIndex = new IndexDescription(false,new String[]{docPriorityField,statusField,checkActionField,checkTimeField});
 
@@ -239,6 +239,8 @@ public class JobQueue extends org.apache
 
         if (uniqueIndex != null && id.equals(uniqueIndex))
           uniqueIndex = null;
+        else if (needPriorityIndex != null && id.equals(needPriorityIndex))
+          needPriorityIndex = null;
         else if (jobStatusIndex != null && id.equals(jobStatusIndex))
           jobStatusIndex = null;
         else if (jobSeedIndex != null && id.equals(jobSeedIndex))
@@ -247,8 +249,6 @@ public class JobQueue extends org.apache
           failTimeIndex = null;
         else if (actionTimeStatusIndex != null && id.equals(actionTimeStatusIndex))
           actionTimeStatusIndex = null;
-        else if (prioritysetStatusIndex != null && id.equals(prioritysetStatusIndex))
-          prioritysetStatusIndex = null;
         else if (docpriorityIndex != null && id.equals(docpriorityIndex))
           docpriorityIndex = null;
         else if (indexName.indexOf("_pkey") == -1)
@@ -261,6 +261,9 @@ public class JobQueue extends org.apache
       if (jobStatusIndex != null)
         performAddIndex(null,jobStatusIndex);
 
+      if (needPriorityIndex != null)
+        performAddIndex(null,needPriorityIndex);
+      
       if (jobSeedIndex != null)
         performAddIndex(null,jobSeedIndex);
 
@@ -270,9 +273,6 @@ public class JobQueue extends org.apache
       if (actionTimeStatusIndex != null)
         performAddIndex(null,actionTimeStatusIndex);
 
-      if (prioritysetStatusIndex != null)
-        performAddIndex(null,prioritysetStatusIndex);
-
       if (docpriorityIndex != null)
         performAddIndex(null,docpriorityIndex);
 
@@ -733,7 +733,7 @@ public class JobQueue extends org.apache
     // Map COMPLETE to PENDINGPURGATORY
     HashMap map = new HashMap();
     map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
-    map.put(prioritySetField,new Long(0L));
+    map.put(needPriorityField,needPriorityToString(true));
     // Do not reset priorities here!  They should all be blank at this point.
     map.put(checkTimeField,new Long(0L));
     map.put(checkActionField,actionToString(ACTION_RESCAN));
@@ -791,7 +791,7 @@ public class JobQueue extends org.apache
     // Map COMPLETE to PENDINGPURGATORY.
     HashMap map = new HashMap();
     map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
-    map.put(prioritySetField,new Long(0L));
+    map.put(needPriorityField,needPriorityToString(true));
     // Do not reset priorities here!  They should all be blank at this point.
     map.put(checkTimeField,new Long(0L));
     map.put(checkActionField,actionToString(ACTION_RESCAN));
@@ -866,11 +866,11 @@ public class JobQueue extends org.apache
   }
 
   /** Write out a document priority */
-  public void writeDocPriority(long currentTime, Long rowID, IPriorityCalculator priority)
+  public void writeDocPriority(Long rowID, IPriorityCalculator priority)
     throws ManifoldCFException
   {
     HashMap map = new HashMap();
-    map.put(prioritySetField,new Long(currentTime));
+    map.put(needPriorityField,needPriorityToString(false));
     map.put(docPriorityField,new Double(priority.getDocumentPriority()));
     ArrayList list = new ArrayList();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -879,12 +879,12 @@ public class JobQueue extends org.apache
     noteModifications(0,1,0);
   }
 
-  /** Clear all document priorities for a job */
+  /** Clear all document priorities for a job that is going to sleep */
   public void clearDocPriorities(Long jobID)
     throws ManifoldCFException
   {
     HashMap map = new HashMap();
-    map.put(prioritySetField,new Long(0L));
+    map.put(needPriorityField,needPriorityToString(false));
     map.put(docPriorityField,nullDocPriority);
     ArrayList list = new ArrayList();
     String query = buildConjunctionClause(list,new ClauseDescription[]{
@@ -913,7 +913,7 @@ public class JobQueue extends org.apache
       checkTimeValue = null;
       // Remove document priority; we don't want to pollute the queue.  See CONNECTORS-290.
       map.put(docPriorityField,nullDocPriority);
-      map.put(prioritySetField,new Long(0L));
+      map.put(needPriorityField,needPriorityToString(false));
       break;
     case STATUS_ACTIVENEEDRESCAN:
     case STATUS_ACTIVENEEDRESCANPURGATORY:
@@ -1186,7 +1186,7 @@ public class JobQueue extends org.apache
   * The record is presumed to exist and have been locked, via "FOR UPDATE".
   */
   public void updateExistingRecordInitial(Long recordID, int currentStatus, Long checkTimeValue,
-    long desiredExecuteTime, long currentTime, IPriorityCalculator desiredPriority, String[] prereqEvents,
+    long desiredExecuteTime, IPriorityCalculator desiredPriority, String[] prereqEvents,
     String processID)
     throws ManifoldCFException
   {
@@ -1225,7 +1225,7 @@ public class JobQueue extends org.apache
       map.put(failCountField,null);
       // Update the doc priority.
       map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
-      map.put(prioritySetField,new Long(currentTime));
+      map.put(needPriorityField,needPriorityToString(false));
       break;
 
     case STATUS_PENDING:
@@ -1282,7 +1282,7 @@ public class JobQueue extends org.apache
   *@param docID is the local document identifier.
   */
   public void insertNewRecordInitial(Long jobID, String docHash, String docID, IPriorityCalculator desiredDocPriority,
-    long desiredExecuteTime, long currentTime, String[] prereqEvents, String processID)
+    long desiredExecuteTime, String[] prereqEvents, String processID)
     throws ManifoldCFException
   {
     // No prerequisites should be possible at this point.
@@ -1302,7 +1302,7 @@ public class JobQueue extends org.apache
     map.put(seedingProcessIDField,processID);
     // Set the document priority
     map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
-    map.put(prioritySetField,new Long(currentTime));
+    map.put(needPriorityField,needPriorityToString(false));
     performInsert(map,null);
     prereqEventManager.addRows(recordID,prereqEvents);
     noteModifications(1,0,0);
@@ -1483,7 +1483,7 @@ public class JobQueue extends org.apache
   * The record is presumed to exist and have been locked, via "FOR UPDATE".
   */
   public void updateExistingRecord(Long recordID, int currentStatus, Long checkTimeValue,
-    long desiredExecuteTime, long currentTime, boolean otherChangesSeen,
+    long desiredExecuteTime, boolean otherChangesSeen,
     IPriorityCalculator desiredPriority, String[] prereqEvents)
     throws ManifoldCFException
   {
@@ -1501,7 +1501,7 @@ public class JobQueue extends org.apache
       map.put(failCountField,null);
       // Going into pending: set the docpriority.
       map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
-      map.put(prioritySetField,new Long(currentTime));
+      map.put(needPriorityField,needPriorityToString(false));
       break;
     case STATUS_COMPLETE:
     case STATUS_BEINGCLEANED:
@@ -1518,7 +1518,7 @@ public class JobQueue extends org.apache
         map.put(failCountField,null);
         // Going into pending: set the docpriority.
         map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
-        map.put(prioritySetField,new Long(currentTime));
+        map.put(needPriorityField,needPriorityToString(false));
         break;
       }
       return;
@@ -1547,7 +1547,7 @@ public class JobQueue extends org.apache
         map.put(failCountField,null);
         // Going into pending: set the docpriority.
         map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
-        map.put(prioritySetField,new Long(currentTime));
+        map.put(needPriorityField,needPriorityToString(false));
         break;
       }
       return;
@@ -1571,7 +1571,7 @@ public class JobQueue extends org.apache
         map.put(failCountField,null);
         // Going into pending: set the docpriority.
         map.put(docPriorityField,new Double(desiredPriority.getDocumentPriority()));
-        map.put(prioritySetField,new Long(currentTime));
+        map.put(needPriorityField,needPriorityToString(false));
         break;
       }
       return;
@@ -1613,7 +1613,7 @@ public class JobQueue extends org.apache
   *
   */
   public void insertNewRecord(Long jobID, String docIDHash, String docID, IPriorityCalculator desiredDocPriority, long desiredExecuteTime,
-    long currentTime, String[] prereqEvents)
+    String[] prereqEvents)
     throws ManifoldCFException
   {
     HashMap map = new HashMap();
@@ -1627,7 +1627,7 @@ public class JobQueue extends org.apache
     map.put(statusField,statusToString(STATUS_PENDING));
     // Be sure to set the priority also
     map.put(docPriorityField,new Double(desiredDocPriority.getDocumentPriority()));
-    map.put(prioritySetField,new Long(currentTime));
+    map.put(needPriorityField,needPriorityToString(false));
     performInsert(map,null);
     prereqEventManager.addRows(recordID,prereqEvents);
     noteModifications(1,0,0);
@@ -1694,6 +1694,26 @@ public class JobQueue extends org.apache
     }
   }
 
+  /** Convert need priority value to boolean.
+  */
+  public static boolean stringToNeedPriority(String value)
+    throws ManifoldCFException
+  {
+    if (value != null && value.equals("T"))
+      return true;
+    return false;
+  }
+  
+  /** Convert boolean to need priority value.
+  */
+  public static String needPriorityToString(boolean value)
+    throws ManifoldCFException
+  {
+    if (value)
+      return "T";
+    return "F";
+  }
+  
   /** Convert status field value to integer.
   *@param value is the string.
   *@return the integer.

Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CrawlerAgent.java Mon Nov 10 13:00:51 2014
@@ -188,6 +188,7 @@ public class CrawlerAgent implements IAg
       // activity.
       // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
       // priorities.
+      // ??? -- start the process of reprioritization ONLY; don't do the whole thing.
       while (true)
       {
         long startTime = System.currentTimeMillis();
@@ -200,12 +201,12 @@ public class CrawlerAgent implements IAg
         }
         long updateTime = currentTimeValue.longValue();
         
-        DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime, 10000);
+        DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(10000);
         if (docs.length == 0)
           break;
 
         // Calculate new priorities for all these documents
-        ManifoldCF.writeDocumentPriorities(threadContext,docs,connectionMap,jobDescriptionMap,updateTime);
+        ManifoldCF.writeDocumentPriorities(threadContext,docs,connectionMap,jobDescriptionMap);
 
         Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
       }

Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Mon Nov 10 13:00:51 2014
@@ -946,7 +946,7 @@ public class ManifoldCF extends org.apac
     }
 
     // Now, requeue the documents with the new priorities
-    jobManager.carrydownChangeDocumentMultiple(requeueCandidates,currentTime,docPriorities);
+    jobManager.carrydownChangeDocumentMultiple(requeueCandidates,docPriorities);
   }
 
   /** Stuff colons so we can't have conflicts. */
@@ -1014,6 +1014,7 @@ public class ManifoldCF extends org.apac
     // activity.
     // In order for this to be the correct functionality, ALL reseeding and requeuing operations MUST reset the associated document
     // priorities.
+    // ???? Should only start the process of reprioritization, not complete it.
     while (true)
     {
       long startTime = System.currentTimeMillis();
@@ -1026,12 +1027,12 @@ public class ManifoldCF extends org.apac
       }
       long updateTime = currentTimeValue.longValue();
       
-      DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(updateTime, 10000);
+      DocumentDescription[] docs = jobManager.getNextNotYetProcessedReprioritizationDocuments(10000);
       if (docs.length == 0)
         break;
 
       // Calculate new priorities for all these documents
-      writeDocumentPriorities(threadContext,docs,connectionMap,jobDescriptionMap,updateTime);
+      writeDocumentPriorities(threadContext,docs,connectionMap,jobDescriptionMap);
 
       Logging.threads.debug("Reprioritized "+Integer.toString(docs.length)+" not-yet-processed documents in "+new Long(System.currentTimeMillis()-startTime)+" ms");
     }
@@ -1042,8 +1043,7 @@ public class ManifoldCF extends org.apac
   /** Write a set of document priorities, based on the current queue tracker.
   */
   public static void writeDocumentPriorities(IThreadContext threadContext, DocumentDescription[] descs,
-    Map<String,IRepositoryConnection> connectionMap, Map<Long,IJobDescription> jobDescriptionMap,
-    long currentTime)
+    Map<String,IRepositoryConnection> connectionMap, Map<Long,IJobDescription> jobDescriptionMap)
     throws ManifoldCFException
   {
     IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
@@ -1128,7 +1128,7 @@ public class ManifoldCF extends org.apac
     rt.preloadBinValues();
     
     // Now, write all the priorities we can.
-    jobManager.writeDocumentPriorities(currentTime,descs,priorities);
+    jobManager.writeDocumentPriorities(descs,priorities);
 
     rt.clearPreloadedValues();
   }

Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SeedingActivity.java Mon Nov 10 13:00:51 2014
@@ -215,22 +215,24 @@ public class SeedingActivity implements 
     throws ManifoldCFException
   {
     // First, prioritize the documents using the queue tracker
-    long prioritizationTime = System.currentTimeMillis();
     IPriorityCalculator[] docPriorities = new IPriorityCalculator[docIDHashes.length];
 
-    int i = 0;
-    while (i < docIDHashes.length)
+    rt.clearPreloadRequests();
+
+    for (int i = 0 ; i < docIDHashes.length ; i++)
     {
       // Calculate desired document priority based on current queuetracker status.
       String[] bins = connector.getBinNames(docIDs[i]);
-      docPriorities[i] = new PriorityCalculator(rt,connection,bins);
-
-      i++;
+      PriorityCalculator p = new PriorityCalculator(rt,connection,bins);
+      docPriorities[i] = p;
+      p.makePreloadRequest();
     }
 
+    rt.preloadBinValues();
+
     jobManager.addDocumentsInitial(processID,
       jobID,legalLinkTypes,docIDHashes,docIDs,overrideSchedule,hopcountMethod,
-      prioritizationTime,docPriorities,prereqEventNames);
+      docPriorities,prereqEventNames);
 
   }
 

Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/SetPriorityThread.java Mon Nov 10 13:00:51 2014
@@ -121,29 +121,29 @@ public class SetPriorityThread extends T
             }
 
             // Cycle through the current list of stuffer-identified documents until we come to the end.  Reprioritize these
-            // first.
+            // first.  NOTE: These documents will already have document priorities.
             DocumentDescription desc = blockingDocuments.getBlockingDocument();
             if (desc != null)
             {
               ManifoldCF.writeDocumentPriorities(threadContext,
-                new DocumentDescription[]{desc},connectionMap,jobDescriptionMap,currentTime);
+                new DocumentDescription[]{desc},connectionMap,jobDescriptionMap);
               processedCount++;
               continue;
             }
-            /* no longer useful given current architecture; only need to reprioritize blocking documents
+	    
             // Grab a list of document identifiers to set priority on.
             // We may well wind up calculating priority for documents that wind up having their
             // state changed before we can write back, but this is okay because update is only
             // going to be permitted for rows that still have the right state.
-            // I found that a limit of 1000 causes postgresql to basically do a linear scan, while a limit of 20 does not!
-            DocumentDescription[] descs = jobManager.getNextReprioritizationDocuments(currentTime,20);
+            DocumentDescription[] descs = jobManager.getNextNotYetProcessedReprioritizationDocuments(1000);
             if (descs.length > 0)
             {
-              writePriorities(threadContext,mgr,jobManager,descs,connectionMap,jobDescriptionMap,currentTime);
+              ManifoldCF.writeDocumentPriorities(threadContext,
+                descs,connectionMap,jobDescriptionMap);
               processedCount += descs.length;
               continue;
             }
-            */
+
             Logging.threads.debug("Done reprioritizing because no more documents to reprioritize");
             ManifoldCF.sleep(30000L);
             break;

Modified: manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1637833&r1=1637832&r2=1637833&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-1100/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Mon Nov 10 13:00:51 2014
@@ -1844,7 +1844,7 @@ public class WorkerThread extends Thread
 
         jobManager.addDocuments(processID,
           jobID,legalLinkTypes,docidHashes,docids,db.getParentIdentifierHash(),db.getLinkType(),hopcountMode,
-          dataNames,dataValues,currentTime,priorities,eventNames);
+          dataNames,dataValues,priorities,eventNames);
         
         rt.clearPreloadedValues();
       }