You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/01/14 03:21:54 UTC

svn commit: r1058837 - in /incubator/lcf/trunk: ./ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ framework/pull-agent/src/main/java/org/apache/manifol...

Author: kwright
Date: Fri Jan 14 02:21:53 2011
New Revision: 1058837

URL: http://svn.apache.org/viewvc?rev=1058837&view=rev
Log:
Fix for CONNECTORS-145.

Modified:
    incubator/lcf/trunk/CHANGES.txt
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteSet.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java

Modified: incubator/lcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/CHANGES.txt?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/CHANGES.txt (original)
+++ incubator/lcf/trunk/CHANGES.txt Fri Jan 14 02:21:53 2011
@@ -3,6 +3,10 @@ $Id$
 
 ==================  0.2-dev ==================
 
+CONNECTORS-145: Refactor cleanup worker threads, expire threads, and
+document delete threads to handle failure of the output connection gracefully.
+(Karl Wright)
+
 CONNECTORS-130: Block the Solr output connector from accepting documents
 that have folder-level security.
 (Karl Wright)

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Fri Jan 14 02:21:53 2011
@@ -360,20 +360,25 @@ public interface IJobManager
   * current status and decide what the new status ought to be, based on a true rollback scenario.  Such cases, however, are rare enough so that
   * special logic is probably not worth it.
   *@param documentDescriptions is the set of description objects for the document that was processed.
+  *@param checkTime is the minimum time for the next cleaning attempt.
   */
-  public void resetDeletingDocumentMultiple(DocumentDescription[] documentDescriptions)
+  public void resetDeletingDocumentMultiple(DocumentDescription[] documentDescriptions, long checkTime)
     throws ManifoldCFException;
 
   /** Reset a deleting document back to its former state.
   * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+  *@param documentDescription is the description object for the document that was cleaned.
+  *@param checkTime is the minimum time for the next cleaning attempt.
   */
-  public void resetDeletingDocument(DocumentDescription documentDescription)
+  public void resetDeletingDocument(DocumentDescription documentDescription, long checkTime)
     throws ManifoldCFException;
 
   /** Reset a cleaning document back to its former state.
-  * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+  * This gets done when a cleaning thread sees a service interruption, etc., from the ingestion system.
+  *@param documentDescription is the description object for the document that was cleaned.
+  *@param checkTime is the minimum time for the next cleaning attempt.
   */
-  public void resetCleaningDocument(DocumentDescription documentDescription)
+  public void resetCleaningDocument(DocumentDescription documentDescription, long checkTime)
     throws ManifoldCFException;
 
   /** Reset a set of cleaning documents for further processing in the future.
@@ -383,8 +388,9 @@ public interface IJobManager
   * current status and decide what the new status ought to be, based on a true rollback scenario.  Such cases, however, are rare enough so that
   * special logic is probably not worth it.
   *@param documentDescriptions is the set of description objects for the document that was cleaned.
+  *@param checkTime is the minimum time for the next cleaning attempt.
   */
-  public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+  public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions, long checkTime)
     throws ManifoldCFException;
 
   /** Add an initial set of documents to the queue.
@@ -744,17 +750,19 @@ public interface IJobManager
   /** Get list of deletable document descriptions.  This list will take into account
   * multiple jobs that may own the same document.
   *@param n is the maximum number of documents to return.
+  *@param currentTime is the current time; some fetches do not occur until a specific time.
   *@return the document descriptions for these documents.
   */
-  public DocumentDescription[] getNextDeletableDocuments(int n)
+  public DocumentDescription[] getNextDeletableDocuments(int n, long currentTime)
     throws ManifoldCFException;
 
   /** Get list of cleanable document descriptions.  This list will take into account
   * multiple jobs that may own the same document.
   *@param n is the maximum number of documents to return.
+  *@param currentTime is the current time; some fetches do not occur until a specific time.
   *@return the document descriptions for these documents.
   */
-  public DocumentSetAndFlags getNextCleanableDocuments(int n)
+  public DocumentSetAndFlags getNextCleanableDocuments(int n, long currentTime)
     throws ManifoldCFException;
 
   /** Delete ingested document identifiers (as part of deleting the owning job).

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Fri Jan 14 02:21:53 2011
@@ -781,9 +781,10 @@ public class JobManager implements IJobM
   * not in transition and are eligible, but are owned by other jobs, will have their
   * jobqueue entries deleted by this method.
   *@param maxCount is the maximum number of documents to return.
+  *@param currentTime is the current time; some fetches do not occur until a specific time.
   *@return the document descriptions for these documents.
   */
-  public DocumentSetAndFlags getNextCleanableDocuments(int maxCount)
+  public DocumentSetAndFlags getNextCleanableDocuments(int maxCount, long currentTime)
     throws ManifoldCFException
   {
     // The query will be built here, because it joins the jobs table against the jobqueue
@@ -830,6 +831,8 @@ public class JobManager implements IJobM
         ArrayList list = new ArrayList();
         list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
         
+        list.add(new Long(currentTime));
+
         list.add(jobs.statusToString(jobs.STATUS_SHUTTINGDOWN));
         
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
@@ -839,9 +842,11 @@ public class JobManager implements IJobM
         list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
         list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
         
+        // The checktime is null field check is for backwards compatibility
         IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
           jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
           jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+"=? "+
+          " AND (t0."+jobQueue.checkTimeField+" IS NULL OR t0."+jobQueue.checkTimeField+"<=?) "+
           " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
           " AND t1."+jobs.statusField+"=?"+
           ") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
@@ -1006,9 +1011,10 @@ public class JobManager implements IJobM
   * not in transition and are eligible, but are owned by other jobs, will have their
   * jobqueue entries deleted by this method.
   *@param maxCount is the maximum number of documents to return.
+  *@param currentTime is the current time; some fetches do not occur until a specific time.
   *@return the document descriptions for these documents.
   */
-  public DocumentDescription[] getNextDeletableDocuments(int maxCount)
+  public DocumentDescription[] getNextDeletableDocuments(int maxCount, long currentTime)
     throws ManifoldCFException
   {
     // The query will be built here, because it joins the jobs table against the jobqueue
@@ -1059,6 +1065,8 @@ public class JobManager implements IJobM
         list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
         list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
         
+        list.add(new Long(currentTime));
+        
         list.add(jobs.statusToString(jobs.STATUS_READYFORDELETE));
         
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
@@ -1068,9 +1076,11 @@ public class JobManager implements IJobM
         list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
         list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
         
+        // The checktime is null field check is for backwards compatibility
         IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
           jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
           jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+" IN (?,?,?) "+
+          " AND (t0."+jobQueue.checkTimeField+" IS NULL OR t0."+jobQueue.checkTimeField+"<=?) "+
           " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
           " AND t1."+jobs.statusField+"=?"+
           ") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
@@ -2807,8 +2817,9 @@ public class JobManager implements IJobM
   * current status and decide what the new status ought to be, based on a true rollback scenario.  Such cases, however, are rare enough so that
   * special logic is probably not worth it.
   *@param documentDescriptions is the set of description objects for the document that was cleaned.
+  *@param checkTime is the minimum time for the next cleaning attempt.
   */
-  public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+  public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions, long checkTime)
     throws ManifoldCFException
   {
     Long[] ids = new Long[documentDescriptions.length];
@@ -2851,7 +2862,7 @@ public class JobManager implements IJobM
         i = 0;
         while (i < ids.length)
         {
-          jobQueue.setUncleaningStatus(ids[i]);
+          jobQueue.setUncleaningStatus(ids[i],checkTime);
           i++;
         }
 
@@ -2884,11 +2895,13 @@ public class JobManager implements IJobM
 
   /** Reset a cleaning document back to its former state.
   * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+  *@param documentDescription is the description of the document that was cleaned.
+  *@param checkTime is the minimum time for the next cleaning attempt.
   */
-  public void resetCleaningDocument(DocumentDescription documentDescription)
+  public void resetCleaningDocument(DocumentDescription documentDescription, long checkTime)
     throws ManifoldCFException
   {
-    resetCleaningDocumentMultiple(new DocumentDescription[]{documentDescription});
+    resetCleaningDocumentMultiple(new DocumentDescription[]{documentDescription},checkTime);
   }
 
   /** Reset a set of deleting documents for further processing in the future.
@@ -2898,8 +2911,9 @@ public class JobManager implements IJobM
   * current status and decide what the new status ought to be, based on a true rollback scenario.  Such cases, however, are rare enough so that
   * special logic is probably not worth it.
   *@param documentDescriptions is the set of description objects for the document that was processed.
+  *@param checkTime is the minimum time for the next cleaning attempt.
   */
-  public void resetDeletingDocumentMultiple(DocumentDescription[] documentDescriptions)
+  public void resetDeletingDocumentMultiple(DocumentDescription[] documentDescriptions, long checkTime)
     throws ManifoldCFException
   {
     Long[] ids = new Long[documentDescriptions.length];
@@ -2942,7 +2956,7 @@ public class JobManager implements IJobM
         i = 0;
         while (i < ids.length)
         {
-          jobQueue.setUndeletingStatus(ids[i]);
+          jobQueue.setUndeletingStatus(ids[i],checkTime);
           i++;
         }
 
@@ -2975,11 +2989,13 @@ public class JobManager implements IJobM
 
   /** Reset a deleting document back to its former state.
   * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+  *@param documentDescription is the description object for the document that was cleaned.
+  *@param checkTime is the minimum time for the next cleaning attempt.
   */
-  public void resetDeletingDocument(DocumentDescription documentDescription)
+  public void resetDeletingDocument(DocumentDescription documentDescription, long checkTime)
     throws ManifoldCFException
   {
-    resetDeletingDocumentMultiple(new DocumentDescription[]{documentDescription});
+    resetDeletingDocumentMultiple(new DocumentDescription[]{documentDescription},checkTime);
   }
 
 
@@ -6266,6 +6282,7 @@ public class JobManager implements IJobM
     list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
     
     list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+    list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
     
     list.add(jobQueue.actionToString(jobQueue.ACTION_RESCAN));
     list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
@@ -6337,7 +6354,9 @@ public class JobManager implements IJobM
       .append(" OR ").append("t0.").append(jobQueue.statusField).append("=?")
       .append(")")
       .append(" THEN 'Waiting forever'")
-      .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=?")
+      .append(" WHEN (").append("t0.").append(jobQueue.statusField).append("=?")
+      .append(" OR ").append("t0.").append(jobQueue.statusField).append("=?)")
+      .append(")")
       .append(" THEN 'Deleting'")
       .append(" WHEN ")
       .append("(t0.").append(jobQueue.checkActionField).append(" IS NULL OR t0.").append(jobQueue.checkActionField).append("=?)")

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Fri Jan 14 02:21:53 2011
@@ -308,12 +308,14 @@ public class JobQueue extends org.apache
 
     // Map BEINGDELETED to COMPLETE
     map.put(statusField,statusToString(STATUS_COMPLETE));
+    map.put(checkTimeField,new Long(0L));
     list.clear();
     list.add(statusToString(STATUS_BEINGDELETED));
     performUpdate(map,"WHERE "+statusField+"=?",list,null);
 
     // Map BEINGCLEANED to PURGATORY
     map.put(statusField,statusToString(STATUS_PURGATORY));
+    map.put(checkTimeField,new Long(0L));
     list.clear();
     list.add(statusToString(STATUS_BEINGCLEANED));
     performUpdate(map,"WHERE "+statusField+"=?",list,null);
@@ -382,6 +384,7 @@ public class JobQueue extends org.apache
     ArrayList list = new ArrayList();
     // Map BEINGDELETED to COMPLETE
     map.put(statusField,statusToString(STATUS_COMPLETE));
+    map.put(checkTimeField,new Long(0L));
     list.clear();
     list.add(statusToString(STATUS_BEINGDELETED));
     performUpdate(map,"WHERE "+statusField+"=?",list,null);
@@ -396,6 +399,7 @@ public class JobQueue extends org.apache
     ArrayList list = new ArrayList();
     // Map BEINGCLEANED to PURGATORY
     map.put(statusField,statusToString(STATUS_PURGATORY));
+    map.put(checkTimeField,new Long(0L));
     list.clear();
     list.add(statusToString(STATUS_BEINGCLEANED));
     performUpdate(map,"WHERE "+statusField+"=?",list,null);
@@ -423,7 +427,7 @@ public class JobQueue extends org.apache
     // Turn PENDINGPURGATORY, COMPLETED into PURGATORY.
     HashMap map = new HashMap();
     map.put(statusField,statusToString(STATUS_PURGATORY));
-    map.put(checkTimeField,null);
+    map.put(checkTimeField,new Long(0L));
     map.put(checkActionField,null);
     map.put(failTimeField,null);
     map.put(failCountField,null);
@@ -555,7 +559,7 @@ public class JobQueue extends org.apache
     case STATUS_ACTIVEPURGATORY:
       newStatus = STATUS_COMPLETE;
       actionFieldValue = null;
-      checkTimeValue = null;
+      checkTimeValue = new Long(0L);
       break;
     case STATUS_ACTIVENEEDRESCAN:
     case STATUS_ACTIVENEEDRESCANPURGATORY:
@@ -653,12 +657,12 @@ public class JobQueue extends org.apache
   }
 
   /** Set the status of a document to be "no longer deleting" */
-  public void setUndeletingStatus(Long id)
+  public void setUndeletingStatus(Long id, long checkTime)
     throws ManifoldCFException
   {
     HashMap map = new HashMap();
     map.put(statusField,statusToString(STATUS_COMPLETE));
-    map.put(checkTimeField,null);
+    map.put(checkTimeField,new Long(checkTime));
     map.put(checkActionField,null);
     map.put(failTimeField,null);
     map.put(failCountField,null);
@@ -681,12 +685,12 @@ public class JobQueue extends org.apache
   }
 
   /** Set the status of a document to be "no longer cleaning" */
-  public void setUncleaningStatus(Long id)
+  public void setUncleaningStatus(Long id, long checkTime)
     throws ManifoldCFException
   {
     HashMap map = new HashMap();
     map.put(statusField,statusToString(STATUS_PURGATORY));
-    map.put(checkTimeField,null);
+    map.put(checkTimeField,new Long(checkTime));
     map.put(checkActionField,null);
     map.put(failTimeField,null);
     map.put(failCountField,null);

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java Fri Jan 14 02:21:53 2011
@@ -32,13 +32,25 @@ public class DocumentCleanupSet
 
   /** This is the array of documents to delete. */
   protected CleanupQueuedDocument[] documents;
+  /** The job description for this set of documents. */
+  protected IJobDescription jobDescription;
 
   /** Constructor.
   *@param documents is the arraylist representing the documents for this chunk.
+  *@param jobDescription is the job description for all the documents.
   */
-  public DocumentCleanupSet(CleanupQueuedDocument[] documents)
+  public DocumentCleanupSet(CleanupQueuedDocument[] documents, IJobDescription jobDescription)
   {
     this.documents = documents;
+    this.jobDescription = jobDescription;
+  }
+
+  /** Get the job description.
+  *@return the job description.
+  */
+  public IJobDescription getJobDescription()
+  {
+    return this.jobDescription;
   }
 
   /** Get the number of documents.

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java Fri Jan 14 02:21:53 2011
@@ -96,11 +96,13 @@ public class DocumentCleanupStufferThrea
 
           Logging.threads.debug("Document cleanup stuffer thread woke up");
 
+          long currentTime = System.currentTimeMillis();
+          
+          // Get a single chunk at a time (but keep going until everything is stuffed)
           // This method will set the status of the documents in question
           // to "beingcleaned".
 
-          // Get a single chunk at a time (but keep going until everything is stuffed)
-          DocumentSetAndFlags documentsToClean = jobManager.getNextCleanableDocuments(deleteChunkSize);
+          DocumentSetAndFlags documentsToClean = jobManager.getNextCleanableDocuments(deleteChunkSize,currentTime);
           DocumentDescription[] descs = documentsToClean.getDocumentSet();
           boolean[] removeFromIndex = documentsToClean.getFlags();
           
@@ -116,16 +118,39 @@ public class DocumentCleanupStufferThrea
           if (Logging.threads.isDebugEnabled())
             Logging.threads.debug("Document cleanup stuffer thread found "+Integer.toString(descs.length)+" documents");
 
-          // Do the stuffing
-          CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[descs.length];
+          // Do the stuffing.  Each set must be segregated by job, since we need the job ID in the doc set.
+          Map jobMap = new HashMap();
           int k = 0;
-          while (k < docDescs.length)
+          while (k < descs.length)
           {
-            docDescs[k] = new CleanupQueuedDocument(descs[k],removeFromIndex[k]);
+            CleanupQueuedDocument x = new CleanupQueuedDocument(descs[k],removeFromIndex[k]);
+            Long jobID = descs[k].getJobID();
+            List y = (List)jobMap.get(jobID);
+            if (y == null)
+            {
+              y = new ArrayList();
+              jobMap.put(jobID,y);
+            }
+            y.add(x);
             k++;
           }
-          DocumentCleanupSet set = new DocumentCleanupSet(docDescs);
-          documentCleanupQueue.addDocuments(set);
+          
+          Iterator iter = jobMap.keySet().iterator();
+          while (iter.hasNext())
+          {
+            Long jobID = (Long)iter.next();
+            IJobDescription jobDescription = jobManager.load(jobID,true);
+            List y = (List)jobMap.get(jobID);
+            CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[y.size()];
+            k = 0;
+            while (k < docDescs.length)
+            {
+              docDescs[k] = (CleanupQueuedDocument)y.get(k);
+              k++;
+            }
+            DocumentCleanupSet set = new DocumentCleanupSet(docDescs,jobDescription);
+            documentCleanupQueue.addDocuments(set);
+          }
 
           // If we don't wait here, the other threads don't have a chance to queue anything else up.
           yield();

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Fri Jan 14 02:21:53 2011
@@ -104,198 +104,164 @@ public class DocumentCleanupThread exten
           if (Thread.currentThread().isInterrupted())
             throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
 
+          IJobDescription job = dds.getJobDescription();
+          String connectionName = job.getConnectionName();
+          String outputConnectionName = job.getOutputConnectionName();
+
           try
           {
             long currentTime = System.currentTimeMillis();
 
-            // We need to segregate all the documents by connection, in order to be able to form a decent activities object
-            // to pass into the incremental ingester.  So, first pass through the document descriptions will build that.
-            Map mappedDocs = new HashMap();
+            // Produce a map of connection name->connection object.  We will use this to perform a request for multiple connector objects
+            IRepositoryConnection connection = connMgr.load(connectionName);
+            
+            // This is where we store the hopcount cleanup data
+            ArrayList arrayDocHashes = new ArrayList();
+            ArrayList arrayDocsToDelete = new ArrayList();
+            ArrayList arrayRelationshipTypes = new ArrayList();
+            ArrayList hopcountMethods = new ArrayList();
+
             int j = 0;
             while (j < dds.getCount())
             {
-              CleanupQueuedDocument dqd = dds.getDocument(j++);
+              CleanupQueuedDocument dqd = dds.getDocument(j);
               DocumentDescription ddd = dqd.getDocumentDescription();
-              Long jobID = ddd.getJobID();
-              IJobDescription job = jobManager.load(jobID,true);
-              String connectionName = job.getConnectionName();
-              ArrayList list = (ArrayList)mappedDocs.get(connectionName);
-              if (list == null)
+              if (job != null && connection != null)
               {
-                list = new ArrayList();
-                mappedDocs.put(connectionName,list);
+                // We'll need the legal link types; grab those before we proceed
+                String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
+                if (legalLinkTypes != null)
+                {
+                  arrayDocHashes.add(ddd.getDocumentIdentifierHash());
+                  arrayDocsToDelete.add(dqd);
+                  arrayRelationshipTypes.add(legalLinkTypes);
+                  hopcountMethods.add(new Integer(job.getHopcountMode()));
+                }
               }
-              list.add(dqd);
+              j++;
             }
 
-            // Now, cycle through all represented connections.
-            // For each connection, construct the necessary pieces to do the deletion.
-            Iterator iter = mappedDocs.keySet().iterator();
-            while (iter.hasNext())
+            // Grab one connection for each connectionName.  If we fail, nothing is lost and retries are possible.
+            try
             {
-              String connectionName = (String)iter.next();
-              ArrayList list = (ArrayList)mappedDocs.get(connectionName);
-
-              // Produce a map of connection name->connection object.  We will use this to perform a request for multiple connector objects
-              IRepositoryConnection connection = connMgr.load(connectionName);
-              ArrayList arrayOutputConnectionNames = new ArrayList();
-              ArrayList arrayDocHashes = new ArrayList();
-              ArrayList arrayDocClasses = new ArrayList();
-              ArrayList arrayDocsToDelete = new ArrayList();
-              ArrayList arrayRelationshipTypes = new ArrayList();
-              ArrayList hopcountMethods = new ArrayList();
-              ArrayList connections = new ArrayList();
-              j = 0;
-              while (j < list.size())
+              IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+              try
               {
-                CleanupQueuedDocument dqd = (CleanupQueuedDocument)list.get(j);
-                DocumentDescription ddd = dqd.getDocumentDescription();
 
-                Long jobID = ddd.getJobID();
-                IJobDescription job = jobManager.load(jobID,true);
-                if (job != null && connection != null)
+                // Iterate over the outputs
+                boolean[] deleteFromQueue = new boolean[arrayDocHashes.size()];
+                    
+                // Count the number of docs to actually delete.  This will be a subset of the documents in the list.
+                int k = 0;
+                int removeCount = 0;
+                while (k < arrayDocHashes.size())
                 {
-                  // We'll need the legal link types; grab those before we proceed
-                  String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
-                  if (legalLinkTypes != null)
+                  if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
                   {
-                    arrayOutputConnectionNames.add(job.getOutputConnectionName());
-                    arrayDocClasses.add(connectionName);
-                    arrayDocHashes.add(ddd.getDocumentIdentifierHash());
-                    arrayDocsToDelete.add(dqd);
-                    arrayRelationshipTypes.add(legalLinkTypes);
-                    hopcountMethods.add(new Integer(job.getHopcountMode()));
+                    deleteFromQueue[k] = false;
+                    removeCount++;
                   }
+                  else
+                    deleteFromQueue[k] = true;
+                  k++;
                 }
-                j++;
-              }
-
-              // Next, segregate the documents by output connection name.  This will permit logging to know what actual activity type to use.
-              HashMap outputMap = new HashMap();
-              j = 0;
-              while (j < arrayDocHashes.size())
-              {
-                String outputConnectionName = (String)arrayOutputConnectionNames.get(j);
-                ArrayList subList = (ArrayList)outputMap.get(outputConnectionName);
-                if (subList == null)
+                    
+                // Allocate removal arrays
+                String[] docClassesToRemove = new String[removeCount];
+                String[] hashedDocsToRemove = new String[removeCount];
+
+                // Now, iterate over the list
+                k = 0;
+                removeCount = 0;
+                while (k < arrayDocHashes.size())
                 {
-                  subList = new ArrayList();
-                  outputMap.put(outputConnectionName,subList);
+                  if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+                  {
+                    docClassesToRemove[removeCount] = connectionName;
+                    hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(k);
+                    removeCount++;
+                  }
+                  k++;
                 }
-                subList.add(new Integer(j));
-                j++;
-              }
+                
+                OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+
+                // Finally, go ahead and delete the documents from the ingestion system.
 
-              // Grab one connection for each connectionName.  If we fail, nothing is lost and retries are possible.
-              try
-              {
-                IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
                 try
                 {
-
-                  // Iterate over the outputs
-                  Iterator outputIterator = outputMap.keySet().iterator();
-                  while (outputIterator.hasNext())
+                  ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
+                  // Success!  Label all these as needing deletion from queue.
+                  k = 0;
+                  while (k < arrayDocHashes.size())
                   {
-                    String outputConnectionName = (String)outputIterator.next();
-                    ArrayList indexList = (ArrayList)outputMap.get(outputConnectionName);
-                    // Count the number of docs to actually delete.  This will be a subset of the documents in the list.
-                    int k = 0;
-                    int removeCount = 0;
-                    while (k < indexList.size())
-                    {
-                      int index = ((Integer)indexList.get(k++)).intValue();
-                      if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
-                        removeCount++;
-                    }
-                    
-                    // Allocate removal arrays
-                    String[] docClassesToRemove = new String[removeCount];
-                    String[] hashedDocsToRemove = new String[removeCount];
-
-                    // Now, iterate over the index list
-                    k = 0;
-                    removeCount = 0;
-                    while (k < indexList.size())
-                    {
-                      int index = ((Integer)indexList.get(k)).intValue();
-                      if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
-                      {
-                        docClassesToRemove[removeCount] = (String)arrayDocClasses.get(index);
-                        hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(index);
-                        removeCount++;
-                      }
-                      k++;
-                    }
-
-                    OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
-
-                    // Finally, go ahead and delete the documents from the ingestion system.
-
-                    while (true)
-                    {
-                      try
-                      {
-                        ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
-                        break;
-                      }
-                      catch (ServiceInterruption e)
-                      {
-                        // If we get a service interruption here, it means that the ingestion API is down.
-                        // There is no point, therefore, in freeing up this thread to go do something else;
-                        // might as well just wait here for our retries.
-                        // Wait for the prescribed time
-                        long amt = e.getRetryTime();
-                        long now = System.currentTimeMillis();
-                        long waittime = amt-now;
-                        if (waittime <= 0L)
-                          waittime = 300000L;
-                        ManifoldCF.sleep(waittime);
-                      }
-                    }
-
-                    // Successfully deleted some documents from ingestion system.  Now, remove them from job queue.  This
-                    // must currently happen one document at a time, because the jobs and connectors for each document
-                    // potentially differ.
-                    k = 0;
-                    while (k < indexList.size())
+                    if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+                      deleteFromQueue[k] = true;
+                    k++;
+                  }
+                }
+                catch (ServiceInterruption e)
+                {
+                  // We don't know which failed, or maybe they all did.
+                  // Go through the list of documents we just tried, and reset them on the queue based on the
+                  // ServiceInterruption parameters.  Then we must proceed to delete ONLY the documents that
+                  // were not part of the index deletion attempt.
+                  k = 0;
+                  while (k < arrayDocHashes.size())
+                  {
+                    CleanupQueuedDocument cqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+                    if (cqd.shouldBeRemovedFromIndex())
                     {
-                      int index = ((Integer)indexList.get(k)).intValue();
-
-                      DeleteQueuedDocument dqd = (DeleteQueuedDocument)arrayDocsToDelete.get(index);
-                      DocumentDescription ddd = dqd.getDocumentDescription();
-                      Long jobID = ddd.getJobID();
-                      int hopcountMethod = ((Integer)hopcountMethods.get(index)).intValue();
-                      String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(index);
-                      DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
-                      // Use the common method for doing the requeuing
-                      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
-                        connector,connection,queueTracker,currentTime);
-                      // Finally, completed expiration of the document.
-                      dqd.setProcessed();
-                      k++;
+                      DocumentDescription dd = cqd.getDocumentDescription();
+                      // To recover from a cleanup failure, requeue the document to PURGATORY.
+                      jobManager.resetCleaningDocument(dd,e.getRetryTime());
+                      cqd.setProcessed();
                     }
+                    k++;
                   }
                 }
-                finally
+
+                // Successfully deleted some documents from ingestion system.  Now, remove them from job queue.  This
+                // must currently happen one document at a time, because the jobs and connectors for each document
+                // potentially differ.
+                k = 0;
+                while (k < arrayDocHashes.size())
                 {
-                  // Free up the reserved connector instance
-                  RepositoryConnectorFactory.release(connector);
+                  if (deleteFromQueue[k])
+                  {
+                    DeleteQueuedDocument dqd = (DeleteQueuedDocument)arrayDocsToDelete.get(k);
+                    DocumentDescription ddd = dqd.getDocumentDescription();
+                    Long jobID = ddd.getJobID();
+                    int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
+                    String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
+                    DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
+                    // Use the common method for doing the requeuing
+                    ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
+                      connector,connection,queueTracker,currentTime);
+                    // Finally, completed expiration of the document.
+                    dqd.setProcessed();
+                  }
+                  k++;
                 }
               }
-              catch (ManifoldCFException e)
+              finally
               {
-                if (e.getErrorCode() == ManifoldCFException.REPOSITORY_CONNECTION_ERROR)
-                {
-                  // This error can only come from grabbing the connections.  So, if this occurs it means that
-                  // all the documents we've been handed have to be stuffed back onto the queue for processing at a later time.
-                  Logging.threads.warn("Document cleanup thread couldn't establish necessary connections, retrying later: "+e.getMessage(),e);
+                // Free up the reserved connector instance
+                RepositoryConnectorFactory.release(connector);
+              }
+            }
+            catch (ManifoldCFException e)
+            {
+              if (e.getErrorCode() == ManifoldCFException.REPOSITORY_CONNECTION_ERROR)
+              {
+                // This error can only come from grabbing the connections.  So, if this occurs it means that
+                // all the documents we've been handed have to be stuffed back onto the queue for processing at a later time.
+                Logging.threads.warn("Document cleanup thread couldn't establish necessary connections, retrying later: "+e.getMessage(),e);
 
-                  // Let the unprocessed documents get requeued!  This is handled at the end of the loop...
-                }
-                else
-                  throw e;
+                // Let the unprocessed documents get requeued!  This is handled at the end of the loop...
               }
+              else
+                throw e;
             }
           }
           catch (ManifoldCFException e)
@@ -306,7 +272,15 @@ public class DocumentCleanupThread exten
             if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
               throw e;
 
-            Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+            // It's ok to abort a job because we can't talk to the search engine during cleanup.
+            if (jobManager.errorAbort(dds.getJobDescription().getID(),e.getMessage()))
+            {
+              // We eat the exception if there was already one recorded.
+
+              // An exception occurred in the processing of a set of documents.
+              // Shut the corresponding job down, with an appropriate error
+              Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+            }
           }
           finally
           {
@@ -319,7 +293,7 @@ public class DocumentCleanupThread exten
               {
                 DocumentDescription ddd = dqd.getDocumentDescription();
                 // Requeue this document!
-                jobManager.resetCleaningDocument(ddd);
+                jobManager.resetCleaningDocument(ddd,0L);
                 dqd.setProcessed();
               }
               j++;

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteSet.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteSet.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteSet.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteSet.java Fri Jan 14 02:21:53 2011
@@ -32,15 +32,27 @@ public class DocumentDeleteSet
 
   /** This is the array of documents to delete. */
   protected DeleteQueuedDocument[] documents;
-
+  /** The job description for this set of documents. */
+  protected IJobDescription jobDescription;
+  
   /** Constructor.
   *@param documents is the arraylist representing the documents for this chunk.
+  *@param jobDescription is the job description for all the documents.
   */
-  public DocumentDeleteSet(DeleteQueuedDocument[] documents)
+  public DocumentDeleteSet(DeleteQueuedDocument[] documents, IJobDescription jobDescription)
   {
     this.documents = documents;
+    this.jobDescription = jobDescription;
   }
 
+  /** Get the job description.
+  *@return the job description.
+  */
+  public IJobDescription getJobDescription()
+  {
+    return this.jobDescription;
+  }
+  
   /** Get the number of documents.
   *@return the number.
   */

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteStufferThread.java Fri Jan 14 02:21:53 2011
@@ -96,11 +96,13 @@ public class DocumentDeleteStufferThread
 
           Logging.threads.debug("Document delete stuffer thread woke up");
 
+          long currentTime = System.currentTimeMillis();
+          
+          // Get a single chunk at a time (but keep going until everything is stuffed)
           // This method will set the status of the documents in question
           // to "beingdeleted".
 
-          // Get a single chunk at a time (but keep going until everything is stuffed)
-          DocumentDescription[] descs = jobManager.getNextDeletableDocuments(deleteChunkSize);
+          DocumentDescription[] descs = jobManager.getNextDeletableDocuments(deleteChunkSize,currentTime);
 
           // If there are no chunks at all, then we can sleep for a while.
           // The theory is that we need to allow stuff to accumulate.
@@ -114,16 +116,39 @@ public class DocumentDeleteStufferThread
           if (Logging.threads.isDebugEnabled())
             Logging.threads.debug("Document delete stuffer thread found "+Integer.toString(descs.length)+" documents");
 
-          // Do the stuffing
-          DeleteQueuedDocument[] docDescs = new DeleteQueuedDocument[descs.length];
+          // Do the stuffing.  Each set must be segregated by job, since we need the job ID in the doc set.
+          Map jobMap = new HashMap();
           int k = 0;
-          while (k < docDescs.length)
+          while (k < descs.length)
           {
-            docDescs[k] = new DeleteQueuedDocument(descs[k]);
+            DeleteQueuedDocument x = new DeleteQueuedDocument(descs[k]);
+            Long jobID = descs[k].getJobID();
+            List y = (List)jobMap.get(jobID);
+            if (y == null)
+            {
+              y = new ArrayList();
+              jobMap.put(jobID,y);
+            }
+            y.add(x);
             k++;
           }
-          DocumentDeleteSet set = new DocumentDeleteSet(docDescs);
-          documentDeleteQueue.addDocuments(set);
+          
+          Iterator iter = jobMap.keySet().iterator();
+          while (iter.hasNext())
+          {
+            Long jobID = (Long)iter.next();
+            IJobDescription jobDescription = jobManager.load(jobID,true);
+            List y = (List)jobMap.get(jobID);
+            DeleteQueuedDocument[] docDescs = new DeleteQueuedDocument[y.size()];
+            k = 0;
+            while (k < docDescs.length)
+            {
+              docDescs[k] = (DeleteQueuedDocument)y.get(k);
+              k++;
+            }
+            DocumentDeleteSet set = new DocumentDeleteSet(docDescs,jobDescription);
+            documentDeleteQueue.addDocuments(set);
+          }
 
           // If we don't wait here, the other threads don't have a chance to queue anything else up.
           yield();

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java Fri Jan 14 02:21:53 2011
@@ -91,6 +91,10 @@ public class DocumentDeleteThread extend
             // Reset
             continue;
 
+          IJobDescription job = dds.getJobDescription();
+          String connectionName = job.getConnectionName();
+          String outputConnectionName = job.getOutputConnectionName();
+
           try
           {
             // Do the delete work.
@@ -99,116 +103,82 @@ public class DocumentDeleteThread extend
             // with the individual connection, so the first job is to segregate what came in into connection bins.  Then, we process each connection
             // bin appropriately.
 
-            // This is a map keyed by connection name, and containing elements that are an ArrayList of DeleteQueuedDocument objects.
-            Map mappedDocs = new HashMap();
+            boolean[] deleteFromQueue = new boolean[dds.getCount()];
+                
+            String[] docClassesToRemove = new String[dds.getCount()];
+            String[] hashedDocsToRemove = new String[dds.getCount()];
+            DeleteQueuedDocument[] docsToDelete = new DeleteQueuedDocument[dds.getCount()];
             int j = 0;
             while (j < dds.getCount())
             {
-              DeleteQueuedDocument dqd = dds.getDocument(j++);
+              DeleteQueuedDocument dqd = dds.getDocument(j);
               DocumentDescription ddd = dqd.getDocumentDescription();
-              Long jobID = ddd.getJobID();
-              IJobDescription job = jobManager.load(jobID,true);
-              String connectionName = job.getConnectionName();
-              ArrayList list = (ArrayList)mappedDocs.get(connectionName);
-              if (list == null)
-              {
-                list = new ArrayList();
-                mappedDocs.put(connectionName,list);
-              }
-              list.add(dqd);
+              docClassesToRemove[j] = connectionName;
+              hashedDocsToRemove[j] = ddd.getDocumentIdentifierHash();
+              docsToDelete[j] = dqd;
+              deleteFromQueue[j] = false;
+              j++;
             }
-
-            // For each connection, construct the necessary pieces to do the deletion.
-            Iterator iter = mappedDocs.keySet().iterator();
-            while (iter.hasNext())
+                
+            OutputRemoveActivity logger = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+                
+            try
             {
-              String connectionName = (String)iter.next();
-              ArrayList list = (ArrayList)mappedDocs.get(connectionName);
-
-              // Segregate by output connection as well.
-              HashMap outputMap = new HashMap();
+              ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,logger);
               j = 0;
-              while (j < list.size())
+              while (j < dds.getCount())
               {
-                DeleteQueuedDocument dqd = (DeleteQueuedDocument)list.get(j);
-                DocumentDescription ddd = dqd.getDocumentDescription();
-                Long jobID = ddd.getJobID();
-                IJobDescription job = jobManager.load(jobID,true);
-                String outputConnectionName = job.getOutputConnectionName();
-
-                ArrayList subList = (ArrayList)outputMap.get(outputConnectionName);
-                if (subList == null)
-                {
-                  subList = new ArrayList();
-                  outputMap.put(outputConnectionName,subList);
-                }
-                subList.add(new Integer(j));
+                deleteFromQueue[j] = true;
                 j++;
               }
-
-              // Now, cycle through all the output connections
-              Iterator outputIterator = outputMap.keySet().iterator();
-              while (outputIterator.hasNext())
+            }
+            catch (ServiceInterruption e)
+            {
+              // We don't know which failed, or maybe they all did.
+              // Go through the list of documents we just tried, and reset them on the queue based on the
+              // ServiceInterruption parameters.  Then we must proceed to delete ONLY the documents that
+              // were not part of the index deletion attempt.
+              j = 0;
+              while (j < dds.getCount())
               {
-                String outputConnectionName = (String)outputIterator.next();
-                ArrayList subList = (ArrayList)outputMap.get(outputConnectionName);
-
-                String[] docClassesToRemove = new String[subList.size()];
-                String[] hashedDocsToRemove = new String[subList.size()];
-                DeleteQueuedDocument[] docsToDelete = new DeleteQueuedDocument[subList.size()];
-                j = 0;
-                while (j < subList.size())
-                {
-                  int index = ((Integer)subList.get(j)).intValue();
-                  DeleteQueuedDocument dqd = (DeleteQueuedDocument)list.get(index);
-                  DocumentDescription ddd = dqd.getDocumentDescription();
-                  Long jobID = ddd.getJobID();
-                  IJobDescription job = jobManager.load(jobID,true);
-                  docClassesToRemove[j] = connectionName;
-                  hashedDocsToRemove[j] = ddd.getDocumentIdentifierHash();
-                  docsToDelete[j] = dqd;
-                  j++;
-                }
-                OutputRemoveActivity logger = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
-                while (true)
-                {
-                  try
-                  {
-                    ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,logger);
-                    break;
-                  }
-                  catch (ServiceInterruption e)
-                  {
-                    // No document deletions can take place while the ingestion API is down, so simply wait for it to come back up.  There's
-                    // nothing better for this thread to be doing...
-                    // Wait for the prescribed time
-                    long amt = e.getRetryTime();
-                    long now = System.currentTimeMillis();
-                    long waittime = amt-now;
-                    if (waittime <= 0L)
-                      waittime = 300000L;
-                    ManifoldCF.sleep(waittime);
-                  }
-                }
-
-                // Delete the records
-                DocumentDescription[] deleteDescriptions = new DocumentDescription[docsToDelete.length];
-                j = 0;
-                while (j < deleteDescriptions.length)
-                {
-                  deleteDescriptions[j] = docsToDelete[j].getDocumentDescription();
-                  j++;
-                }
-                jobManager.deleteIngestedDocumentIdentifiers(deleteDescriptions);
-                // Mark them as gone
-                j = 0;
-                while (j < docsToDelete.length)
-                {
-                  docsToDelete[j++].wasProcessed();
-                }
+                DeleteQueuedDocument cqd = docsToDelete[j];
+                DocumentDescription dd = cqd.getDocumentDescription();
+                // To recover from an expiration failure, requeue the document to COMPLETED etc.
+                jobManager.resetDeletingDocument(dd,e.getRetryTime());
+                cqd.setProcessed();
+                j++;
               }
             }
 
+            // Count the records we're actually going to delete
+            int recordCount = 0;
+            j = 0;
+            while (j < dds.getCount())
+            {
+              if (deleteFromQueue[j])
+                recordCount++;
+              j++;
+            }
+                
+            // Delete the records
+            DocumentDescription[] deleteDescriptions = new DocumentDescription[recordCount];
+            j = 0;
+            recordCount = 0;
+            while (j < dds.getCount())
+            {
+              if (deleteFromQueue[j])
+                deleteDescriptions[recordCount++] = docsToDelete[j].getDocumentDescription();
+              j++;
+            }
+            jobManager.deleteIngestedDocumentIdentifiers(deleteDescriptions);
+            // Mark them as gone
+            j = 0;
+            while (j < dds.getCount())
+            {
+              if (deleteFromQueue[j])
+                docsToDelete[j].wasProcessed();
+              j++;
+            }
             // Go around again
           }
           finally
@@ -226,7 +196,7 @@ public class DocumentDeleteThread extend
                 // Pop this document back into the jobqueue in an appropriate state
                 DocumentDescription ddd = dqd.getDocumentDescription();
                 // Requeue this document!
-                jobManager.resetDeletingDocument(ddd);
+                jobManager.resetDeletingDocument(ddd,0L);
                 dqd.setProcessed();
 
               }

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java Fri Jan 14 02:21:53 2011
@@ -135,19 +135,41 @@ public class ExpireStufferThread extends
             continue;
           }
 
-          // Do the stuffing
-          CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[descs.length];
+          // Do the stuffing.  Each set must be segregated by job, since we need the job ID in the doc set.
+          Map jobMap = new HashMap();
           int k = 0;
-          while (k < docDescs.length)
+          while (k < descs.length)
           {
-            docDescs[k] = new CleanupQueuedDocument(descs[k],deleteFromIndex[k]);
+            CleanupQueuedDocument x = new CleanupQueuedDocument(descs[k],deleteFromIndex[k]);
+            Long jobID = descs[k].getJobID();
+            List y = (List)jobMap.get(jobID);
+            if (y == null)
+            {
+              y = new ArrayList();
+              jobMap.put(jobID,y);
+            }
+            y.add(x);
             k++;
           }
-          DocumentCleanupSet set = new DocumentCleanupSet(docDescs);
-          documentQueue.addDocuments(set);
+          
+          Iterator iter = jobMap.keySet().iterator();
+          while (iter.hasNext())
+          {
+            Long jobID = (Long)iter.next();
+            IJobDescription jobDescription = jobManager.load(jobID,true);
+            List y = (List)jobMap.get(jobID);
+            CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[y.size()];
+            k = 0;
+            while (k < docDescs.length)
+            {
+              docDescs[k] = (CleanupQueuedDocument)y.get(k);
+              k++;
+            }
+            DocumentCleanupSet set = new DocumentCleanupSet(docDescs,jobDescription);
+            documentQueue.addDocuments(set);
+          }
 
-          // If we don't wait here, the other threads don't seem to have a chance to queue anything else up.
-          ManifoldCF.sleep(1000L);
+          yield();
         }
         catch (ManifoldCFException e)
         {

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1058837&r1=1058836&r2=1058837&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Fri Jan 14 02:21:53 2011
@@ -94,197 +94,177 @@ public class ExpireThread extends Thread
           if (Thread.currentThread().isInterrupted())
             throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
 
+          IJobDescription job = dds.getJobDescription();
+          String connectionName = job.getConnectionName();
+          String outputConnectionName = job.getOutputConnectionName();
+          
           try
           {
             long currentTime = System.currentTimeMillis();
 
-            // We need to segregate all the documents by connection, in order to be able to form a decent activities object
-            // to pass into the incremental ingester.  So, first pass through the document descriptions will build that.
-            Map mappedDocs = new HashMap();
+            // Documents will be naturally segregated by connection, since each set comes from a single job.
+
+            // Produce a map of connection name->connection object.  We will use this to perform a request for multiple connector objects
+            IRepositoryConnection connection = connMgr.load(connectionName);
+            
+            // This is where we store the hopcount cleanup data
+            ArrayList arrayDocHashes = new ArrayList();
+            ArrayList arrayDocsToDelete = new ArrayList();
+            ArrayList arrayRelationshipTypes = new ArrayList();
+            ArrayList hopcountMethods = new ArrayList();
+            
             int j = 0;
             while (j < dds.getCount())
             {
-              CleanupQueuedDocument dqd = dds.getDocument(j++);
+              CleanupQueuedDocument dqd = dds.getDocument(j);
               DocumentDescription ddd = dqd.getDocumentDescription();
-              Long jobID = ddd.getJobID();
-              IJobDescription job = jobManager.load(jobID,true);
-              String connectionName = job.getConnectionName();
-              ArrayList list = (ArrayList)mappedDocs.get(connectionName);
-              if (list == null)
+              if (job != null && connection != null)
               {
-                list = new ArrayList();
-                mappedDocs.put(connectionName,list);
+                // We'll need the legal link types; grab those before we proceed
+                String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
+                if (legalLinkTypes != null)
+                {
+                  arrayDocHashes.add(ddd.getDocumentIdentifierHash());
+                  arrayDocsToDelete.add(dqd);
+                  arrayRelationshipTypes.add(legalLinkTypes);
+                  hopcountMethods.add(new Integer(job.getHopcountMode()));
+                }
               }
-              list.add(dqd);
+              j++;
             }
 
-            // Now, cycle through all represented connections.
-            // For each connection, construct the necessary pieces to do the deletion.
-            Iterator iter = mappedDocs.keySet().iterator();
-            while (iter.hasNext())
+            // Grab one connection for the connectionName.  If we fail, nothing is lost and retries are possible.
+            try
             {
-              String connectionName = (String)iter.next();
-              ArrayList list = (ArrayList)mappedDocs.get(connectionName);
-
-              // Produce a map of connection name->connection object.  We will use this to perform a request for multiple connector objects
-              IRepositoryConnection connection = connMgr.load(connectionName);
-              ArrayList arrayOutputConnectionNames = new ArrayList();
-              ArrayList arrayDocHashes = new ArrayList();
-              ArrayList arrayDocClasses = new ArrayList();
-              ArrayList arrayDocsToDelete = new ArrayList();
-              ArrayList arrayRelationshipTypes = new ArrayList();
-              ArrayList hopcountMethods = new ArrayList();
-              ArrayList connections = new ArrayList();
-              j = 0;
-              while (j < list.size())
+              IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+              try
               {
-                CleanupQueuedDocument dqd = (CleanupQueuedDocument)list.get(j);
-                DocumentDescription ddd = dqd.getDocumentDescription();
-                Long jobID = ddd.getJobID();
-                IJobDescription job = jobManager.load(jobID,true);
-                if (job != null && connection != null)
+
+                // Iterate over the outputs
+                boolean[] deleteFromQueue = new boolean[arrayDocHashes.size()];
+                    
+                // Count the number of docs to actually delete.  This will be a subset of the documents in the list.
+                int k = 0;
+                int removeCount = 0;
+                while (k < arrayDocHashes.size())
                 {
-                  // We'll need the legal link types; grab those before we proceed
-                  String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
-                  if (legalLinkTypes != null)
+                  if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
                   {
-                    arrayOutputConnectionNames.add(job.getOutputConnectionName());
-                    arrayDocClasses.add(connectionName);
-                    arrayDocHashes.add(ddd.getDocumentIdentifierHash());
-                    arrayDocsToDelete.add(dqd);
-                    arrayRelationshipTypes.add(legalLinkTypes);
-                    hopcountMethods.add(new Integer(job.getHopcountMode()));
+                    deleteFromQueue[k] = false;
+                    removeCount++;
                   }
+                  else
+                    deleteFromQueue[k] = true;
+                  k++;
                 }
-                j++;
-              }
-
-              // Next, segregate the documents by output connection name.  This will permit logging to know what actual activity type to use.
-              HashMap outputMap = new HashMap();
-              j = 0;
-              while (j < arrayDocHashes.size())
-              {
-                String outputConnectionName = (String)arrayOutputConnectionNames.get(j);
-                ArrayList subList = (ArrayList)outputMap.get(outputConnectionName);
-                if (subList == null)
+                    
+                // Allocate removal arrays
+                String[] docClassesToRemove = new String[removeCount];
+                String[] hashedDocsToRemove = new String[removeCount];
+
+                // Now, iterate over the list
+                k = 0;
+                removeCount = 0;
+                while (k < arrayDocHashes.size())
                 {
-                  subList = new ArrayList();
-                  outputMap.put(outputConnectionName,subList);
+                  if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+                  {
+                    docClassesToRemove[removeCount] = connectionName;
+                    hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(k);
+                    removeCount++;
+                  }
+                  k++;
                 }
-                subList.add(new Integer(j));
-                j++;
-              }
 
-              // Grab one connection for each connectionName.  If we fail, nothing is lost and retries are possible.
-              try
-              {
-                IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,connection.getClassName(),connection.getConfigParams(),connection.getMaxConnections());
+                OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+
+                // Finally, go ahead and delete the documents from the ingestion system.
+                // If we fail, we need to put the documents back on the queue.
                 try
                 {
-
-                  // Iterate over the outputs
-                  Iterator outputIterator = outputMap.keySet().iterator();
-                  while (outputIterator.hasNext())
+                  ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
+                  // Success!  Label all these as needing deletion from queue.
+                  k = 0;
+                  while (k < arrayDocHashes.size())
                   {
-                    String outputConnectionName = (String)outputIterator.next();
-                    ArrayList indexList = (ArrayList)outputMap.get(outputConnectionName);
-                    // Count the number of docs to actually delete.  This will be a subset of the documents in the list.
-                    int k = 0;
-                    int removeCount = 0;
-                    while (k < indexList.size())
-                    {
-                      int index = ((Integer)indexList.get(k++)).intValue();
-                      if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
-                        removeCount++;
-                    }
-                    
-                    // Allocate removal arrays
-                    String[] docClassesToRemove = new String[removeCount];
-                    String[] hashedDocsToRemove = new String[removeCount];
-
-                    // Now, iterate over the index list
-                    k = 0;
-                    removeCount = 0;
-                    while (k < indexList.size())
-                    {
-                      int index = ((Integer)indexList.get(k)).intValue();
-                      if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
-                      {
-                        docClassesToRemove[removeCount] = (String)arrayDocClasses.get(index);
-                        hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(index);
-                        removeCount++;
-                      }
-                      k++;
-                    }
-
-                    OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
-
-                    // Finally, go ahead and delete the documents from the ingestion system.
-
-                    while (true)
+                    if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+                      deleteFromQueue[k] = true;
+                    k++;
+                  }
+                }
+                catch (ServiceInterruption e)
+                {
+                  // We don't know which failed, or maybe they all did.
+                  // Go through the list of documents we just tried, and reset them on the queue based on the
+                  // ServiceInterruption parameters.  Then we must proceed to delete ONLY the documents that
+                  // were not part of the index deletion attempt.
+                  k = 0;
+                  while (k < arrayDocHashes.size())
+                  {
+                    CleanupQueuedDocument cqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+                    if (cqd.shouldBeRemovedFromIndex())
                     {
-                      try
+                      DocumentDescription dd = cqd.getDocumentDescription();
+                      if (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
+                        dd.getFailRetryCount() == 0)
                       {
-                        ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
-                        break;
+                        // Treat this as a hard failure.
+                        if (e.isAbortOnFail())
+                          throw new ManifoldCFException("Repeated service interruptions - failure expiring document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
                       }
-                      catch (ServiceInterruption e)
+                      else
                       {
-                        // If we get a service interruption here, it means that the ingestion API is down.
-                        // There is no point, therefore, in freeing up this thread to go do something else;
-                        // might as well just wait here for our retries.
-                        // Wait for the prescribed time
-                        long amt = e.getRetryTime();
-                        long now = System.currentTimeMillis();
-                        long waittime = amt-now;
-                        if (waittime <= 0L)
-                          waittime = 300000L;
-                        ManifoldCF.sleep(waittime);
+                        // To recover from an expiration failure, requeue the document to PENDING etc.
+                        jobManager.resetDocument(dd,e.getRetryTime(),
+                          IJobManager.ACTION_REMOVE,e.getFailTime(),e.getFailRetryCount());
+                        cqd.setProcessed();
                       }
                     }
-
-                    // Successfully deleted some documents from ingestion system.  Now, remove them from job queue.  This
-                    // must currently happen one document at a time, because the jobs and connectors for each document
-                    // potentially differ.
-                    k = 0;
-                    while (k < indexList.size())
-                    {
-                      int index = ((Integer)indexList.get(k)).intValue();
-
-                      CleanupQueuedDocument dqd = (CleanupQueuedDocument)arrayDocsToDelete.get(index);
-                      DocumentDescription ddd = dqd.getDocumentDescription();
-                      Long jobID = ddd.getJobID();
-                      int hopcountMethod = ((Integer)hopcountMethods.get(index)).intValue();
-                      String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(index);
-                      DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
-                      // Use the common method for doing the requeuing
-                      ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
-                        connector,connection,queueTracker,currentTime);
-                      // Finally, completed expiration of the document.
-                      dqd.setProcessed();
-                      k++;
-                    }
+                    k++;
                   }
                 }
-                finally
+
+                // Successfully deleted some documents from ingestion system.  Now, remove them from job queue.  This
+                // must currently happen one document at a time, because the jobs and connectors for each document
+                // potentially differ.
+                k = 0;
+                while (k < arrayDocHashes.size())
                 {
-                  // Free up the reserved connector instance
-                  RepositoryConnectorFactory.release(connector);
+                  if (deleteFromQueue[k])
+                  {
+                    CleanupQueuedDocument dqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+                    DocumentDescription ddd = dqd.getDocumentDescription();
+                    Long jobID = ddd.getJobID();
+                    int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
+                    String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
+                    DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
+                    // Use the common method for doing the requeuing
+                    ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
+                      connector,connection,queueTracker,currentTime);
+                    // Finally, completed expiration of the document.
+                    dqd.setProcessed();
+                  }
+                  k++;
                 }
               }
-              catch (ManifoldCFException e)
+              finally
               {
-                if (e.getErrorCode() == ManifoldCFException.REPOSITORY_CONNECTION_ERROR)
-                {
-                  // This error can only come from grabbing the connections.  So, if this occurs it means that
-                  // all the documents we've been handed have to be stuffed back onto the queue for processing at a later time.
-                  Logging.threads.warn("Expire thread couldn't establish necessary connections, retrying later: "+e.getMessage(),e);
+                // Free up the reserved connector instance
+                RepositoryConnectorFactory.release(connector);
+              }
+            }
+            catch (ManifoldCFException e)
+            {
+              if (e.getErrorCode() == ManifoldCFException.REPOSITORY_CONNECTION_ERROR)
+              {
+                // This error can only come from grabbing the connections.  So, if this occurs it means that
+                // all the documents we've been handed have to be stuffed back onto the queue for processing at a later time.
+                Logging.threads.warn("Expire thread couldn't establish necessary connections, retrying later: "+e.getMessage(),e);
 
-                  // Let the unprocessed documents get requeued!  This is handled at the end of the loop...
-                }
-                else
-                  throw e;
+                // Let the unprocessed documents get requeued!  This is handled at the end of the loop...
               }
+              else
+                throw e;
             }
           }
           catch (ManifoldCFException e)
@@ -295,7 +275,15 @@ public class ExpireThread extends Thread
             if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
               throw e;
 
-            Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+            // It's ok to abort a job because we can't talk to the search engine.
+            if (jobManager.errorAbort(dds.getJobDescription().getID(),e.getMessage()))
+            {
+              // We eat the exception if there was already one recorded.
+
+              // An exception occurred in the processing of a set of documents.
+              // Shut the corresponding job down, with an appropriate error
+              Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+            }
           }
           finally
           {