You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/01/10 03:35:42 UTC

svn commit: r1057076 [1/2] - in /incubator/lcf/trunk: ./ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ framework/pull-agent/src/main/java/org/apache/m...

Author: kwright
Date: Mon Jan 10 02:35:42 2011
New Revision: 1057076

URL: http://svn.apache.org/viewvc?rev=1057076&view=rev
Log:
Fix for CONNECTORS-146.  Introduce a new flow for documents being cleaned up, as opposed to being deleted as part of a job's deletion.  This furthermore allows us to fix a problem with carrydown data being affected by document cleanup.

Added:
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java   (with props)
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java   (with props)
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java   (with props)
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java   (with props)
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java   (with props)
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java   (with props)
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java   (with props)
Modified:
    incubator/lcf/trunk/CHANGES.txt
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java

Modified: incubator/lcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/CHANGES.txt?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/CHANGES.txt (original)
+++ incubator/lcf/trunk/CHANGES.txt Mon Jan 10 02:35:42 2011
@@ -14,6 +14,10 @@ forget the index state for an output con
 ======================= Release 0.1 =======================
 Release Date:  See http://incubator.apache.org/connectors for the official release date.
 
+CONNECTORS-146: Problem with document cleanup logic would cause data corruption
+in carrydown data and in hopcount information.
+(Karl Wright)
+
 CONNECTORS-143: Copyright notice needs to be changed to 2011.
 (Sebb, Karl Wright)
 

Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,49 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.interfaces;
+
+/** This class describes a set of documents and an associated boolean flag for each.
+*/
+public class DocumentSetAndFlags
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  protected DocumentDescription[] documentSet;
+  protected boolean[] flags;
+  
+  /** Constructor. */
+  public DocumentSetAndFlags(DocumentDescription[] documentSet, boolean[] flags)
+  {
+    this.documentSet = documentSet;
+    this.flags = flags;
+  }
+  
+  /** Get the document set. */
+  public DocumentDescription[] getDocumentSet()
+  {
+    return documentSet;
+  }
+  
+  /** Get the flags */
+  public boolean[] getFlags()
+  {
+    return flags;
+  }
+  
+}

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Mon Jan 10 02:35:42 2011
@@ -3,7 +3,7 @@
 /**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
+* this work for additional information regarding copyright ownership.f
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
@@ -158,6 +158,11 @@ public interface IJobManager
   public void resetDocDeleteWorkerStatus()
     throws ManifoldCFException;
 
+  /** Reset as part of restoring doc cleanup threads.
+  */
+  public void resetDocCleanupWorkerStatus()
+    throws ManifoldCFException;
+
   /** Reset as part of restoring startup threads.
   */
   public void resetStartupWorkerStatus()
@@ -365,6 +370,23 @@ public interface IJobManager
   public void resetDeletingDocument(DocumentDescription documentDescription)
     throws ManifoldCFException;
 
+  /** Reset a cleaning document back to its former state.
+  * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+  */
+  public void resetCleaningDocument(DocumentDescription documentDescription)
+    throws ManifoldCFException;
+
+  /** Reset a set of cleaning documents for further processing in the future.
+  * This method is called after some unknown number of the documents were cleaned, but then an ingestion service interruption occurred.
+  * Note well: The logic here basically presumes that we cannot know whether the documents were indeed cleaned or not.
+  * If we knew for a fact that none of the documents had been handled, it would be possible to look at the document's
+  * current status and decide what the new status ought to be, based on a true rollback scenario.  Such cases, however, are rare enough so that
+  * special logic is probably not worth it.
+  *@param documentDescriptions is the set of description objects for the document that was cleaned.
+  */
+  public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+    throws ManifoldCFException;
+
   /** Add an initial set of documents to the queue.
   * This method is called during job startup, when the queue is being loaded.
   * A set of document references is passed to this method, which updates the status of the document
@@ -727,6 +749,14 @@ public interface IJobManager
   public DocumentDescription[] getNextDeletableDocuments(int n)
     throws ManifoldCFException;
 
+  /** Get list of cleanable document descriptions.  This list will take into account
+  * multiple jobs that may own the same document.
+  *@param n is the maximum number of documents to return.
+  *@return the document descriptions for these documents.
+  */
+  public DocumentSetAndFlags getNextCleanableDocuments(int n)
+    throws ManifoldCFException;
+
   /** Delete ingested document identifiers (as part of deleting the owning job).
   * The number of identifiers specified is guaranteed to be less than the maxInClauseCount
   * for the database.
@@ -755,8 +785,12 @@ public interface IJobManager
   public void finishJobs()
     throws ManifoldCFException;
 
-  /** Reset eligible jobs back to "inactive" state.  This method is used to pick up all jobs in the shutting down state
-  * whose purgatory records have been all cleaned up.
+  /** Reset eligible jobs either back to the "inactive" state, or make them active again.  The
+  * latter will occur if the cleanup phase of the job generated more pending documents.
+  *
+  *  This method is used to pick up all jobs in the shutting down state
+  * whose purgatory or being-cleaned records have been all processed.
+  *
   *@param currentTime is the current time in milliseconds since epoch.
   *@param resetJobs is filled in with the set of IJobDescription objects that were reset.
   */

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java Mon Jan 10 02:35:42 2011
@@ -506,7 +506,7 @@ public class HopCount extends org.apache
   }
 
   /** Remove a set of document identifiers specified as a criteria.  This will remove hopcount rows and
-  * also intrinsic links that have the specified document identifiers are sources.
+  * also intrinsic links that have the specified document identifiers as sources.
   */
   public void deleteMatchingDocuments(Long jobID, String[] legalLinkTypes,
     String sourceTableName,

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Jan 10 02:35:42 2011
@@ -740,6 +740,16 @@ public class JobManager implements IJobM
     Logging.jobs.debug("Reset complete");
   }
 
+  /** Reset as part of restoring doc cleanup threads.
+  */
+  public void resetDocCleanupWorkerStatus()
+    throws ManifoldCFException
+  {
+    Logging.jobs.debug("Resetting doc cleaning status");
+    jobQueue.resetDocCleanupWorkerStatus();
+    Logging.jobs.debug("Reset complete");
+  }
+
   /** Reset as part of restoring startup threads.
   */
   public void resetStartupWorkerStatus()
@@ -765,6 +775,198 @@ public class JobManager implements IJobM
     // carrydown records get removed when the job itself is removed.
   }
 
+  /** Get list of cleanable document descriptions.  This list will take into account
+  * multiple jobs that may own the same document.  All documents for which a description
+  * is returned will be transitioned to the "beingcleaned" state.  Documents which are
+  * not in transition and are eligible, but are owned by other jobs, will have their
+  * jobqueue entries deleted by this method.
+  *@param maxCount is the maximum number of documents to return.
+  *@return the document descriptions for these documents.
+  */
+  public DocumentSetAndFlags getNextCleanableDocuments(int maxCount)
+    throws ManifoldCFException
+  {
+    // The query will be built here, because it joins the jobs table against the jobqueue
+    // table.
+    //
+    // This query must only pick up documents that are not active in any job and
+    // which belong to a job that's in a "shutting down" state and are in
+    // a "purgatory" state.
+    //
+    // We are in fact more conservative in this query than we need to be; the documents
+    // excluded will include some that simply match our criteria, which is designed to
+    // be fast rather than perfect.  The match we make is: hashvalue against hashvalue, and
+    // different job id's.
+    //
+    // SELECT id,jobid,docid FROM jobqueue t0 WHERE t0.status='P' AND EXISTS(SELECT 'x' FROM
+    //              jobs t3 WHERE t0.jobid=t3.id AND t3.status='X')
+    //      AND NOT EXISTS(SELECT 'x' FROM jobqueue t2 WHERE t0.hashval=t2.hashval AND t0.jobid!=t2.jobid
+    //              AND t2.status IN ('A','F','B'))
+    //
+
+    // Do a simple preliminary query, since the big query is currently slow, so that we don't waste time during stasis or
+    // ingestion.
+    // Moved outside of transaction, so we have no chance of locking up job status cache key for an extended period of time.
+    if (!jobs.cleaningJobsPresent())
+      return new DocumentSetAndFlags(new DocumentDescription[0],new boolean[0]);
+
+    long startTime = 0L;
+    if (Logging.perf.isDebugEnabled())
+    {
+      startTime = System.currentTimeMillis();
+      Logging.perf.debug("Waiting to find documents to put on the cleaning queue");
+    }
+
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        if (Logging.perf.isDebugEnabled())
+          Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on cleaning queue");
+
+        // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being cleaned".
+        ArrayList list = new ArrayList();
+        list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
+        
+        list.add(jobs.statusToString(jobs.STATUS_SHUTTINGDOWN));
+        
+        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
+        
+        IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
+          jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
+          jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+"=? "+
+          " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
+          " AND t1."+jobs.statusField+"=?"+
+          ") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
+          jobQueue.docHashField+" AND t0."+jobQueue.jobIDField+"!=t2."+jobQueue.jobIDField+
+          " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
+          list,null,null,maxCount,null);
+
+        if (Logging.perf.isDebugEnabled())
+          Logging.perf.debug("Done getting docs to cleaning queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+
+        // We need to organize the returned set by connection name, so that we can efficiently
+        // use  getUnindexableDocumentIdentifiers.
+        // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription
+        // objects.
+        HashMap connectionNameMap = new HashMap();
+        HashMap documentIDMap = new HashMap();
+        int i = 0;
+        while (i < set.getRowCount())
+        {
+          IResultRow row = set.getRow(i);
+          Long jobID = (Long)row.getValue(jobQueue.jobIDField);
+          String documentIDHash = (String)row.getValue(jobQueue.docHashField);
+          String documentID = (String)row.getValue(jobQueue.docIDField);
+          Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
+          Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
+          // Failtime is probably not useful in this context, but we'll bring it along for completeness
+          long failTime;
+          if (failTimeValue == null)
+            failTime = -1L;
+          else
+            failTime = failTimeValue.longValue();
+          int failCount;
+          if (failCountValue == null)
+            failCount = 0;
+          else
+            failCount = (int)failCountValue.longValue();
+          IJobDescription jobDesc = load(jobID);
+          String connectionName = jobDesc.getConnectionName();
+          DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
+            jobID,documentIDHash,documentID,failTime,failCount);
+          documentIDMap.put(documentIDHash,dd);
+          ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+          if (x == null)
+          {
+            // New entry needed
+            x = new ArrayList();
+            connectionNameMap.put(connectionName,x);
+          }
+          x.add(dd);
+          i++;
+        }
+
+        // For each bin, obtain a filtered answer, and enter all answers into a hash table.
+        // We'll then scan the result again to look up the right descriptions for return,
+        // and delete the ones that are owned multiply.
+        HashMap allowedDocIds = new HashMap();
+        Iterator iter = connectionNameMap.keySet().iterator();
+        while (iter.hasNext())
+        {
+          String connectionName = (String)iter.next();
+          ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+          // Do the filter query
+          DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+          int j = 0;
+          while (j < descriptions.length)
+          {
+            descriptions[j] = (DocumentDescription)x.get(j);
+            j++;
+          }
+          String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName);
+          j = 0;
+          while (j < docIDHashes.length)
+          {
+            String docIDHash = docIDHashes[j++];
+            allowedDocIds.put(docIDHash,docIDHash);
+          }
+        }
+
+        // Now, assemble a result, and change the state of the records accordingly
+        DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
+        boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+        i = 0;
+        iter = documentIDMap.keySet().iterator();
+        while (iter.hasNext())
+        {
+          String docIDHash = (String)iter.next();
+          DocumentDescription dd = (DocumentDescription)documentIDMap.get(docIDHash);
+          // Determine whether we can delete it from the index or not
+          rvalBoolean[i] = (allowedDocIds.get(docIDHash) != null);
+          // Set the record status to "being cleaned" and return it
+          rval[i++] = dd;
+          jobQueue.setCleaningStatus(dd.getID());
+        }
+
+        if (Logging.perf.isDebugEnabled())
+          Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+
+        return new DocumentSetAndFlags(rval,rvalBoolean);
+
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
   /** Get list of deletable document descriptions.  This list will take into account
   * multiple jobs that may own the same document.  All documents for which a description
   * is returned will be transitioned to the "beingdeleted" state.  Documents which are
@@ -789,9 +991,8 @@ public class JobManager implements IJobM
     // be fast rather than perfect.  The match we make is: hashvalue against hashvalue, and
     // different job id's.
     //
-    // SELECT id,jobid,docid FROM jobqueue t0 WHERE ((t0.status IN ('C','P','G') AND EXISTS(SELECT 'x' FROM
-    //      jobs t1 WHERE t0.jobid=t1.id AND t1.status='D')) OR (t0.status='P' AND EXISTS(SELECT 'x' FROM
-    //              jobs t3 WHERE t0.jobid=t3.id AND t3.status='X')))
+    // SELECT id,jobid,docid FROM jobqueue t0 WHERE (t0.status IN ('C','P','G') AND EXISTS(SELECT 'x' FROM
+    //      jobs t1 WHERE t0.jobid=t1.id AND t1.status='D')
     //      AND NOT EXISTS(SELECT 'x' FROM jobqueue t2 WHERE t0.hashval=t2.hashval AND t0.jobid!=t2.jobid
     //              AND t2.status IN ('A','F','B'))
     //
@@ -827,27 +1028,21 @@ public class JobManager implements IJobM
         
         list.add(jobs.statusToString(jobs.STATUS_READYFORDELETE));
         
-        list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
-        
-        list.add(jobs.statusToString(jobs.STATUS_SHUTTINGDOWN));
-        
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
         list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
         
         IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
           jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
-          jobQueue.getTableName()+" t0 WHERE ((t0."+jobQueue.statusField+" IN (?,?,?) "+
+          jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+" IN (?,?,?) "+
           " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
           " AND t1."+jobs.statusField+"=?"+
-          ")) OR (t0."+jobQueue.statusField+"=?"+
-          " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t3 WHERE t0."+jobQueue.jobIDField+"=t3."+jobs.idField+
-          " AND t3."+jobs.statusField+"=?"+
-          "))) AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
+          ") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
           jobQueue.docHashField+" AND t0."+jobQueue.jobIDField+"!=t2."+jobQueue.jobIDField+
-          " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
+          " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
           list,null,null,maxCount,null);
 
         if (Logging.perf.isDebugEnabled())
@@ -1329,6 +1524,7 @@ public class JobManager implements IJobM
     list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
     list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
     list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+    list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
     
     StringBuffer sb = new StringBuffer("SELECT t0.");
     sb.append(jobQueue.idField).append(",t0.");
@@ -1346,7 +1542,7 @@ public class JobManager implements IJobM
     sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE t0.")
       .append(jobQueue.docHashField).append("=t2.").append(jobQueue.docHashField).append(" AND t0.")
       .append(jobQueue.jobIDField).append("!=t2.").append(jobQueue.jobIDField).append(" AND t2.")
-      .append(jobQueue.statusField).append(" IN (?,?,?,?,?))");
+      .append(jobQueue.statusField).append(" IN (?,?,?,?,?,?))");
     sb.append(" ").append(database.constructOffsetLimitClause(0,n));
 
     // Analyze jobqueue tables unconditionally, since it's become much more sensitive in 8.3 than it used to be.
@@ -1775,6 +1971,7 @@ public class JobManager implements IJobM
     list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
     list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
     list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+    list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
     
     sb.append("t0.").append(jobQueue.checkTimeField).append("<=? AND ");
     sb.append("(t0.").append(jobQueue.checkActionField).append(" IS NULL OR t0.").append(jobQueue.checkActionField)
@@ -1785,7 +1982,7 @@ public class JobManager implements IJobM
     sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE t0.")
       .append(jobQueue.docHashField).append("=t2.").append(jobQueue.docHashField).append(" AND t0.")
       .append(jobQueue.jobIDField).append("!=t2.").append(jobQueue.jobIDField).append(" AND t2.")
-      .append(jobQueue.statusField).append(" IN (?,?,?,?,?)) AND ");
+      .append(jobQueue.statusField).append(" IN (?,?,?,?,?,?)) AND ");
 
     // Prerequisite event clause: AND NOT EXISTS(SELECT 'x' FROM prereqevents t3,events t4 WHERE t3.ownerid=t0.id AND t3.name=t4.name)
     sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.prereqEventManager.getTableName()).append(" t3,").append(eventManager.getTableName()).append(" t4 WHERE t0.")
@@ -2487,6 +2684,97 @@ public class JobManager implements IJobM
     }
   }
 
+  /** Reset a set of cleaning documents for further processing in the future.
+  * This method is called after some unknown number of the documents were cleaned, but then an ingestion service interruption occurred.
+  * Note well: The logic here basically presumes that we cannot know whether the documents were indeed cleaned or not.
+  * If we knew for a fact that none of the documents had been handled, it would be possible to look at the document's
+  * current status and decide what the new status ought to be, based on a true rollback scenario.  Such cases, however, are rare enough so that
+  * special logic is probably not worth it.
+  *@param documentDescriptions is the set of description objects for the document that was cleaned.
+  */
+  public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+    throws ManifoldCFException
+  {
+    Long[] ids = new Long[documentDescriptions.length];
+    String[] docIDHashes = new String[documentDescriptions.length];
+
+    // First loop maps document identifier back to an index.
+    HashMap indexMap = new HashMap();
+    int i = 0;
+    while (i < documentDescriptions.length)
+    {
+      docIDHashes[i] =documentDescriptions[i].getDocumentIdentifierHash() + ":" + documentDescriptions[i].getJobID();
+      indexMap.put(docIDHashes[i],new Integer(i));
+      i++;
+    }
+
+    // Sort!
+    java.util.Arrays.sort(docIDHashes);
+
+    // Next loop populates the actual arrays we use to feed the operation so that the ordering is correct.
+    i = 0;
+    while (i < docIDHashes.length)
+    {
+      String docIDHash = docIDHashes[i];
+      Integer x = (Integer)indexMap.remove(docIDHash);
+      if (x == null)
+        throw new ManifoldCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
+      int index = x.intValue();
+      ids[i] = documentDescriptions[index].getID();
+      i++;
+    }
+
+    // Documents get marked PURGATORY regardless of their current state; this is because we can't know at this point what the actual prior state was.
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        // Going through ids in order should greatly reduce or eliminate chances of deadlock occurring.  We thus need to pay attention to the sorted order.
+        i = 0;
+        while (i < ids.length)
+        {
+          jobQueue.setUncleaningStatus(ids[i]);
+          i++;
+        }
+
+        break;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction resetting cleaning documents: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
+  /** Reset a cleaning document back to its former state.
+  * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+  */
+  public void resetCleaningDocument(DocumentDescription documentDescription)
+    throws ManifoldCFException
+  {
+    resetCleaningDocumentMultiple(new DocumentDescription[]{documentDescription});
+  }
+
   /** Reset a set of deleting documents for further processing in the future.
   * This method is called after some unknown number of the documents were deleted, but then an ingestion service interruption occurred.
   * Note well: The logic here basically presumes that we cannot know whether the documents were indeed processed or not.
@@ -5113,7 +5401,7 @@ public class JobManager implements IJobM
       // This method must find only jobs that have nothing hanging around in their jobqueue that represents an ingested
       // document.  Any jobqueue entries which are in a state to interfere with the delete will be cleaned up by other
       // threads, so eventually a job will become eligible.  This happens when there are no records that have an ingested
-      // status: complete, purgatory, being-deleted, or pending purgatory.
+      // status: complete, purgatory, being-cleaned, being-deleted, or pending purgatory.
       database.beginTransaction();
       try
       {
@@ -5152,12 +5440,15 @@ public class JobManager implements IJobM
           list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
           list.add(jobID);
           list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+          list.add(jobID);
+          list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
 
           IResultSet confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
             jobQueue.getTableName()+" WHERE "+
             "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
             "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
             "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
+            "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
             "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) "+database.constructOffsetLimitClause(0,1),list,null,null,1,null);
 
           if (confirmSet.getRowCount() > 0)
@@ -5438,8 +5729,12 @@ public class JobManager implements IJobM
     }
   }
 
-  /** Reset eligible jobs back to "inactive" state.  This method is used to pick up all jobs in the shutting down state
-  * whose purgatory or being-deleted records have been all cleaned up.
+  /** Reset eligible jobs either back to the "inactive" state, or make them active again.  The
+  * latter will occur if the cleanup phase of the job generated more pending documents.
+  *
+  *  This method is used to pick up all jobs in the shutting down state
+  * whose purgatory or being-cleaned records have been all processed.
+  *
   *@param currentTime is the current time in milliseconds since epoch.
   *@param resetJobs is filled in with the set of IJobDescription objects that were reset.
   */
@@ -5477,7 +5772,7 @@ public class JobManager implements IJobM
           list.add(jobID);
           list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
           list.add(jobID);
-          list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
 
           IResultSet confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
             jobQueue.getTableName()+" WHERE "+
@@ -5487,14 +5782,40 @@ public class JobManager implements IJobM
           if (confirmSet.getRowCount() > 0)
             continue;
 
-          IJobDescription jobDesc = jobs.load(jobID,true);
-          resetJobs.add(jobDesc);
-          
-          // Label the job "finished"
-          jobs.finishJob(jobID,currentTime);
-          if (Logging.jobs.isDebugEnabled())
+          // The shutting-down phase is complete.  However, we need to check if there are any outstanding
+          // PENDING or PENDINGPURGATORY records before we can decide what to do.
+          list.clear();
+          list.add(jobID);
+          list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
+          list.add(jobID);
+          list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
+
+          confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
+            jobQueue.getTableName()+" WHERE "+
+            "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
+            "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) "+database.constructOffsetLimitClause(0,1),list,null,null,1,null);
+
+          if (confirmSet.getRowCount() > 0)
           {
-            Logging.jobs.debug("Job "+jobID+" now completed");
+            // This job needs to re-enter the active state.  Make that happen.
+            jobs.returnJobToActive(jobID);
+            if (Logging.jobs.isDebugEnabled())
+            {
+              Logging.jobs.debug("Job "+jobID+" is re-entering active state");
+            }
+          }
+          else
+          {
+            // This job should be marked as finished.
+            IJobDescription jobDesc = jobs.load(jobID,true);
+            resetJobs.add(jobDesc);
+            
+            // Label the job "finished"
+            jobs.finishJob(jobID,currentTime);
+            if (Logging.jobs.isDebugEnabled())
+            {
+              Logging.jobs.debug("Job "+jobID+" now completed");
+            }
           }
         }
         return;
@@ -5804,6 +6125,7 @@ public class JobManager implements IJobM
     list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
     list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
     list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+    list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
     
     list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
     list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
@@ -5857,6 +6179,7 @@ public class JobManager implements IJobM
       .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Processed'")
       .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Processed'")
       .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Being removed'")
+      .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Being removed'")
       .append(" ELSE 'Unknown'")
       .append(" END AS state,")
       .append("CASE")
@@ -5982,6 +6305,7 @@ public class JobManager implements IJobM
     list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
     
     list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+    list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
     
     list.add(jobQueue.actionToString(jobQueue.ACTION_RESCAN));
     list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
@@ -6035,6 +6359,7 @@ public class JobManager implements IJobM
       .append("CASE")
       .append(" WHEN ")
       .append(jobQueue.statusField).append("=?")
+      .append(" OR ").append(jobQueue.statusField).append("=?")
       .append(" THEN 1 ELSE 0")
       .append(" END")
       .append(" as deleting,")
@@ -6177,9 +6502,10 @@ public class JobManager implements IJobM
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
         list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
         list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
         list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
-        sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?,?,?,?,?)");
+        sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?,?,?,?,?,?)");
         break;
       }
       k++;
@@ -6221,7 +6547,8 @@ public class JobManager implements IJobM
         break;
       case DOCSTATUS_DELETING:
         list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
-        sb.append(fieldPrefix).append(jobQueue.statusField).append("=?");
+        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
+        sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?)");
         break;
       case DOCSTATUS_READYFORPROCESSING:
         list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Mon Jan 10 02:35:42 2011
@@ -45,6 +45,7 @@ public class JobQueue extends org.apache
   public final static int STATUS_BEINGDELETED = 6;
   public final static int STATUS_ACTIVENEEDRESCAN = 7;
   public final static int STATUS_ACTIVENEEDRESCANPURGATORY = 8;
+  public final static int STATUS_BEINGCLEANED = 9;
 
   // Action values
   public final static int ACTION_RESCAN = 0;
@@ -66,9 +67,12 @@ public class JobQueue extends org.apache
   // an aborted job.  On recovery, PENDING and ACTIVE records are deleted (since they were never
   // completed), while PENDINGPURGATORY and ACTIVEPURGATORY records are retained but get marked as PURGATORY.
   //
-  // BEINGDELETED means that the document is queued to be cleaned up because the owning job is being
+  // BEINGDELETED means that the document is queued because the owning job is being
   //   deleted.  It exists so that jobs that are active can avoid processing a document until the cleanup
   //   activity is done.
+  //
+  // BEINGCLEANED means that the document is queued because the owning job is in the SHUTTINGDOWN
+  //   state, and the document was never encountered during the crawl.
 
   // Field names
   public static final String idField = "id";
@@ -98,6 +102,7 @@ public class JobQueue extends org.apache
     statusMap.put("D",new Integer(STATUS_BEINGDELETED));
     statusMap.put("a",new Integer(STATUS_ACTIVENEEDRESCAN));
     statusMap.put("f",new Integer(STATUS_ACTIVENEEDRESCANPURGATORY));
+    statusMap.put("d",new Integer(STATUS_BEINGCLEANED));
   }
 
   protected static Map seedstatusMap;
@@ -307,6 +312,12 @@ public class JobQueue extends org.apache
     list.add(statusToString(STATUS_BEINGDELETED));
     performUpdate(map,"WHERE "+statusField+"=?",list,null);
 
+    // Map BEINGCLEANED to PURGATORY
+    map.put(statusField,statusToString(STATUS_PURGATORY));
+    list.clear();
+    list.add(statusToString(STATUS_BEINGCLEANED));
+    performUpdate(map,"WHERE "+statusField+"=?",list,null);
+
     // Map newseed fields to seed
     map.put(isSeedField,seedstatusToString(SEEDSTATUS_SEED));
     list.clear();
@@ -376,6 +387,20 @@ public class JobQueue extends org.apache
     performUpdate(map,"WHERE "+statusField+"=?",list,null);
   }
 
+  /** Reset doc cleaning worker status.
+  */
+  public void resetDocCleanupWorkerStatus()
+    throws ManifoldCFException
+  {
+    HashMap map = new HashMap();
+    ArrayList list = new ArrayList();
+    // Map BEINGCLEANED to PURGATORY
+    map.put(statusField,statusToString(STATUS_PURGATORY));
+    list.clear();
+    list.add(statusToString(STATUS_BEINGCLEANED));
+    performUpdate(map,"WHERE "+statusField+"=?",list,null);
+  }
+
   /** Prepare for a "full scan" job.  This will not be called
   * unless the job is in the "INACTIVE" state.
   * This does the following:
@@ -642,6 +667,34 @@ public class JobQueue extends org.apache
     performUpdate(map,"WHERE "+idField+"=?",list,null);
   }
 
+  /** Set the status of a document to "being cleaned".
+  */
+  public void setCleaningStatus(Long id)
+    throws ManifoldCFException
+  {
+    ArrayList list = new ArrayList();
+    list.add(id);
+    HashMap map = new HashMap();
+    map.put(statusField,statusToString(STATUS_BEINGCLEANED));
+    performUpdate(map,"WHERE "+idField+"=?",list,null);
+    noteModifications(0,1,0);
+  }
+
+  /** Set the status of a document to be "no longer cleaning" */
+  public void setUncleaningStatus(Long id)
+    throws ManifoldCFException
+  {
+    HashMap map = new HashMap();
+    map.put(statusField,statusToString(STATUS_PURGATORY));
+    map.put(checkTimeField,null);
+    map.put(checkActionField,null);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
+    ArrayList list = new ArrayList();
+    list.add(id);
+    performUpdate(map,"WHERE "+idField+"=?",list,null);
+  }
+
   /** Remove multiple records entirely.
   *@param ids is the set of job queue id's
   */
@@ -715,12 +768,14 @@ public class JobQueue extends org.apache
     case STATUS_ACTIVEPURGATORY:
     case STATUS_ACTIVENEEDRESCAN:
     case STATUS_ACTIVENEEDRESCANPURGATORY:
+    case STATUS_BEINGCLEANED:
+      // These are all the active states.  Being in this state implies that a thread may be working on the document.  We
+      // must not interrupt it.
       // Initial adds never bring along any carrydown info, so we should be satisfied as long as the record exists.
       break;
 
     case STATUS_COMPLETE:
     case STATUS_PURGATORY:
-    case STATUS_BEINGDELETED:
       // Set the status and time both
       map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
       if (desiredExecuteTime == -1L)
@@ -996,6 +1051,7 @@ public class JobQueue extends org.apache
       rval = true;
       break;
     case STATUS_COMPLETE:
+    case STATUS_BEINGCLEANED:
       // Requeue the document for processing, if there have been other changes.
       if (otherChangesSeen)
       {
@@ -1222,6 +1278,8 @@ public class JobQueue extends org.apache
       return "a";
     case STATUS_ACTIVENEEDRESCANPURGATORY:
       return "f";
+    case STATUS_BEINGCLEANED:
+      return "d";
     default:
       throw new ManifoldCFException("Bad status value: "+Integer.toString(status));
     }

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1057076&r1=1057075&r2=1057076&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Mon Jan 10 02:35:42 2011
@@ -1115,6 +1115,67 @@ public class Jobs extends org.apache.man
     performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
   }
 
+  /** Put job back into active state, from the shutting-down state.
+  *@param jobID is the job identifier.
+  */
+  public void returnJobToActive(Long jobID)
+    throws ManifoldCFException
+  {
+    beginTransaction();
+    try
+    {
+      ArrayList list = new ArrayList();
+      list.add(jobID);
+      IResultSet set = performQuery("SELECT "+statusField+","+connectionNameField+","+outputNameField+" FROM "+getTableName()+" WHERE "+
+        idField+"=? FOR UPDATE",list,null,null);
+      if (set.getRowCount() == 0)
+        throw new ManifoldCFException("Can't find job "+jobID.toString());
+      IResultRow row = set.getRow(0);
+      int status = stringToStatus((String)row.getValue(statusField));
+      int newStatus;
+      switch (status)
+      {
+      case STATUS_SHUTTINGDOWN:
+        if (connectionMgr.checkConnectorExists((String)row.getValue(connectionNameField)))
+        {
+          if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
+            newStatus = STATUS_ACTIVE;
+          else
+            newStatus = STATUS_ACTIVE_NOOUTPUT;
+        }
+        else
+        {
+          if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
+            newStatus = STATUS_ACTIVE_UNINSTALLED;
+          else
+            newStatus = STATUS_ACTIVE_NEITHER;
+        }
+        break;
+      default:
+        // Complain!
+        throw new ManifoldCFException("Unexpected job status encountered: "+Integer.toString(status));
+      }
+
+      HashMap map = new HashMap();
+      map.put(statusField,statusToString(newStatus));
+      performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
+    }
+    catch (ManifoldCFException e)
+    {
+      signalRollback();
+      throw e;
+    }
+    catch (Error e)
+    {
+      signalRollback();
+      throw e;
+    }
+    finally
+    {
+      endTransaction();
+    }
+  }
+  
   /** Make job active, and set the start time field.
   *@param jobID is the job identifier.
   *@param startTime is the current time in milliseconds from start of epoch.
@@ -1763,8 +1824,8 @@ public class Jobs extends org.apache.man
 
   }
 
-  /** Return true if there is a job in either the READYFORDELETE state or the
-  * SHUTTINGDOWN state.  (This matches the conditions for values to be returned from
+  /** Return true if there is a job in the READYFORDELETE state.  (This matches the
+  * conditions for values to be returned from
   * getNextDeletableDocuments).
   *@return true if such jobs exist.
   */
@@ -1773,9 +1834,24 @@ public class Jobs extends org.apache.man
   {
     ArrayList list = new ArrayList();
     list.add(statusToString(STATUS_READYFORDELETE));
+    IResultSet set = performQuery("SELECT "+idField+" FROM "+getTableName()+" WHERE "+
+      statusField+"=? "+constructOffsetLimitClause(0,1),
+      list,new StringSet(getJobStatusKey()),null,1);
+    return set.getRowCount() > 0;
+  }
+
+  /** Return true if there is a job in the
+  * SHUTTINGDOWN state.  (This matches the conditions for values to be returned from
+  * getNextCleanableDocuments).
+  *@return true if such jobs exist.
+  */
+  public boolean cleaningJobsPresent()
+    throws ManifoldCFException
+  {
+    ArrayList list = new ArrayList();
     list.add(statusToString(STATUS_SHUTTINGDOWN));
     IResultSet set = performQuery("SELECT "+idField+" FROM "+getTableName()+" WHERE "+
-      statusField+" IN (?,?) "+constructOffsetLimitClause(0,1),
+      statusField+"=? "+constructOffsetLimitClause(0,1),
       list,new StringSet(getJobStatusKey()),null,1);
     return set.getRowCount() > 0;
   }

Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,53 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+
+/** This class represents a document that will be placed on the document cleanup queue, and will be
+* processed by a cleanup worker thread.
+*/
+public class CleanupQueuedDocument extends DeleteQueuedDocument
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** Flag indicating whether we should delete the document from the index or not. */
+  protected boolean deleteFromIndex;
+
+  /** Constructor.
+  *@param documentDescription is the document description.
+  */
+  public CleanupQueuedDocument(DocumentDescription documentDescription, boolean deleteFromIndex)
+  {
+    super(documentDescription);
+    this.deleteFromIndex = deleteFromIndex;
+  }
+
+  /** Check if document should be removed from the index.
+  *@return true if it should be removed.
+  */
+  public boolean shouldBeRemovedFromIndex()
+  {
+    return deleteFromIndex;
+  }
+
+}
+

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,51 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import java.io.*;
+import java.util.*;
+
+/** Class which handles reset for cleanup thread pool (of which there's
+* typically only one member).  The reset action here
+* is to move the status of documents from "???" back to "PURGATORY".
+*/
+public class DocCleanupResetManager extends ResetManager
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  protected DocumentCleanupQueue ddq;
+
+  /** Constructor. */
+  public DocCleanupResetManager(DocumentCleanupQueue ddq)
+  {
+    super();
+    this.ddq = ddq;
+  }
+
+  /** Reset */
+  protected void performResetLogic(IThreadContext tc)
+    throws ManifoldCFException
+  {
+    IJobManager jobManager = JobManagerFactory.make(tc);
+    jobManager.resetDocCleanupWorkerStatus();
+    ddq.clear();
+  }
+}

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,112 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import java.util.*;
+
+/** This class describes a cleanup document queue, which has a "stuffer" thread and many "reader" threads.
+* The queue manages thread synchronization so that (a) the "stuffer" thread blocks until queue is empty, and
+* (b) the "reader" threads block if queue is empty.
+* The objects being queued are all QueuedDocumentSet objects.
+*/
+public class DocumentCleanupQueue
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  // Since the queue has a maximum size, an ArrayList is a fine way to keep it
+  protected ArrayList queue = new ArrayList();
+
+  /** Constructor.
+  */
+  public DocumentCleanupQueue()
+  {
+  }
+
+  /** Wake up all threads waiting on this queue.  This happens at the beginning of a reset.
+  */
+  public void reset()
+  {
+    synchronized (queue)
+    {
+      queue.notifyAll();
+    }
+  }
+
+  /** Clear.  This is only used on reset.
+  */
+  public void clear()
+  {
+    synchronized (queue)
+    {
+      queue.clear();
+    }
+  }
+
+  /** Check if "empty".
+  *@param n is the low-water mark; if the number falls below this, then this method will return true.
+  */
+  public boolean checkIfEmpty(int n)
+  {
+    synchronized (queue)
+    {
+      if (queue.size() <= n)
+        return true;
+    }
+    return false;
+  }
+
+  /** Add a document set to the queue.  This will be a set of n documents (where n is some chunk size
+  * set by experiment).
+  *@param dd is the document set.
+  */
+  public void addDocuments(DocumentCleanupSet dd)
+  {
+    synchronized (queue)
+    {
+      queue.add(dd);
+      queue.notify();
+    }
+  }
+
+  /** Pull a document set off the queue, and wait if there is
+  * nothing there.
+  *@return the document.
+  */
+  public DocumentCleanupSet getDocuments()
+    throws InterruptedException
+  {
+    synchronized (queue)
+    {
+      // If queue is empty, go to sleep
+      while (queue.size() == 0)
+        queue.wait();
+      // If we've been awakened, there's either an entry to grab, or we've been
+      // awakened because it's time to reset.
+      if (queue.size() == 0)
+        return null;
+      // If we've been awakened, there's an entry to grab
+      DocumentCleanupSet dd = (DocumentCleanupSet)queue.remove(queue.size()-1);
+      return dd;
+    }
+  }
+
+
+}

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,61 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import java.util.*;
+
+/** This class is what's actually queued for cleanup threads.  It represents an array of DocumentDescription objects,
+* of an appropriate size to be a decent chunk.  It will be processed by a single cleanup worker thread, in bulk.
+*/
+public class DocumentCleanupSet
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** This is the array of documents to delete. */
+  protected CleanupQueuedDocument[] documents;
+
+  /** Constructor.
+  *@param documents is the arraylist representing the documents for this chunk.
+  */
+  public DocumentCleanupSet(CleanupQueuedDocument[] documents)
+  {
+    this.documents = documents;
+  }
+
+  /** Get the number of documents.
+  *@return the number.
+  */
+  public int getCount()
+  {
+    return documents.length;
+  }
+
+  /** Get the nth document.
+  *@param index is the document number.
+  *@return the document.
+  */
+  public CleanupQueuedDocument getDocument(int index)
+  {
+    return documents[index];
+  }
+
+}

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
------------------------------------------------------------------------------
    svn:keywords = Id

Added: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java?rev=1057076&view=auto
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java (added)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java Mon Jan 10 02:35:42 2011
@@ -0,0 +1,194 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+import org.apache.manifoldcf.crawler.system.Logging;
+import java.util.*;
+import java.lang.reflect.*;
+
+/** This class looks for documents that need to be deleted (as part of an end-of-job cleanup), and
+* queues them up for the various document cleanup threads to take care of.
+* To do this, this thread performs a query which returns a chunk of results, then queues those
+* results.  The individual document delete threads will be waiting on the queue.
+* Once the queue is full enough, the thread then sleeps until the delete queue is empty again.
+*/
+public class DocumentCleanupStufferThread extends Thread
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  // Local data
+  // This is a reference to the static main document queue
+  protected DocumentCleanupQueue documentCleanupQueue;
+  // This is the reset manager
+  protected DocCleanupResetManager resetManager;
+  // This is the number of entries we want to stuff at any one time.
+  int n;
+
+  /** Constructor.
+  *@param documentCleanupQueue is the document queue we'll be stuffing.
+  *@param n is the maximum number of threads that will be doing delete processing.
+  */
+  public DocumentCleanupStufferThread(DocumentCleanupQueue documentCleanupQueue, int n, DocCleanupResetManager resetManager)
+    throws ManifoldCFException
+  {
+    super();
+    this.documentCleanupQueue = documentCleanupQueue;
+    this.n = n;
+    this.resetManager = resetManager;
+    setName("Document cleanup stuffer thread");
+    setDaemon(true);
+  }
+
+  public void run()
+  {
+    resetManager.registerMe();
+
+    try
+    {
+      // Create a thread context object.
+      IThreadContext threadContext = ThreadContextFactory.make();
+      IRepositoryConnectionManager mgr = RepositoryConnectionManagerFactory.make(threadContext);
+      IJobManager jobManager = JobManagerFactory.make(threadContext);
+
+      ArrayList docList = new ArrayList();
+
+      IDBInterface database = DBInterfaceFactory.make(threadContext,
+        ManifoldCF.getMasterDatabaseName(),
+        ManifoldCF.getMasterDatabaseUsername(),
+        ManifoldCF.getMasterDatabasePassword());
+
+      int deleteChunkSize = database.getMaxInClause();
+
+      // Loop
+      while (true)
+      {
+        // Do another try/catch around everything in the loop
+        try
+        {
+          resetManager.waitForReset(threadContext);
+
+          // Wait until the delete queue is "empty" (meaning that some delete threads
+          // can run out of work if we don't act).
+          if (documentCleanupQueue.checkIfEmpty(n) == false)
+          {
+            ManifoldCF.sleep(100L);
+            continue;
+          }
+
+          Logging.threads.debug("Document cleanup stuffer thread woke up");
+
+          // This method will set the status of the documents in question
+          // to "beingcleaned".
+
+          // Get a single chunk at a time (but keep going until everything is stuffed)
+          DocumentSetAndFlags documentsToClean = jobManager.getNextCleanableDocuments(deleteChunkSize);
+          DocumentDescription[] descs = documentsToClean.getDocumentSet();
+          boolean[] removeFromIndex = documentsToClean.getFlags();
+          
+          // If there are no chunks at all, then we can sleep for a while.
+          // The theory is that we need to allow stuff to accumulate.
+          if (descs.length == 0)
+          {
+            Logging.threads.debug("Document cleanup stuffer thread found nothing to do");
+            ManifoldCF.sleep(1000L);       // 1 second
+            continue;
+          }
+
+          if (Logging.threads.isDebugEnabled())
+            Logging.threads.debug("Document cleanup stuffer thread found "+Integer.toString(descs.length)+" documents");
+
+          // Do the stuffing
+          CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[descs.length];
+          int k = 0;
+          while (k < docDescs.length)
+          {
+            docDescs[k] = new CleanupQueuedDocument(descs[k],removeFromIndex[k]);
+            k++;
+          }
+          DocumentCleanupSet set = new DocumentCleanupSet(docDescs);
+          documentCleanupQueue.addDocuments(set);
+
+          // If we don't wait here, the other threads don't have a chance to queue anything else up.
+          yield();
+        }
+        catch (ManifoldCFException e)
+        {
+          if (e.getErrorCode() == ManifoldCFException.INTERRUPTED)
+            break;
+
+          if (e.getErrorCode() == ManifoldCFException.DATABASE_CONNECTION_ERROR)
+          {
+            resetManager.noteEvent();
+            documentCleanupQueue.reset();
+
+            Logging.threads.error("Cleanup stuffer thread aborting and restarting due to database connection reset",e);
+            try
+            {
+              // Give the database a chance to catch up/wake up
+              ManifoldCF.sleep(10000L);
+            }
+            catch (InterruptedException se)
+            {
+              break;
+            }
+            continue;
+          }
+
+          // Log it, but keep the thread alive
+          Logging.threads.error("Exception tossed: "+e.getMessage(),e);
+
+          if (e.getErrorCode() == ManifoldCFException.SETUP_ERROR)
+          {
+            // Shut the whole system down!
+            System.exit(1);
+          }
+
+        }
+        catch (InterruptedException e)
+        {
+          // We're supposed to quit
+          break;
+        }
+        catch (OutOfMemoryError e)
+        {
+          System.err.println("agents process ran out of memory - shutting down");
+          e.printStackTrace(System.err);
+          System.exit(-200);
+        }
+        catch (Throwable e)
+        {
+          // A more severe error - but stay alive
+          Logging.threads.fatal("Error tossed: "+e.getMessage(),e);
+        }
+      }
+    }
+    catch (Throwable e)
+    {
+      // Severe error on initialization
+      System.err.println("agents process could not start - shutting down");
+      Logging.threads.fatal("DocumentCleanupStufferThread initialization error tossed: "+e.getMessage(),e);
+      System.exit(-300);
+    }
+
+  }
+
+}

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
------------------------------------------------------------------------------
    svn:keywords = Id