You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/01/10 10:24:00 UTC

svn commit: r1057136 - in /incubator/lcf/branches/release-0.1-incubating-branch: ./ framework/core/src/main/java/org/apache/manifoldcf/core/database/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/ framework/pull-agent/src/...

Author: kwright
Date: Mon Jan 10 09:24:00 2011
New Revision: 1057136

URL: http://svn.apache.org/viewvc?rev=1057136&view=rev
Log:
Pull up fixes for tickets CONNECTORS-146 and CONNECTORS-148 into release branch.

Added:
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java
      - copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentSetAndFlags.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java
      - copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/CleanupQueuedDocument.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java
      - copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocCleanupResetManager.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java
      - copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupQueue.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
      - copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupSet.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
      - copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupStufferThread.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
      - copied unchanged from r1057076, incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
Modified:
    incubator/lcf/branches/release-0.1-incubating-branch/   (props changed)
    incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt
    incubator/lcf/branches/release-0.1-incubating-branch/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfacePostgreSQL.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
    incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java

Propchange: incubator/lcf/branches/release-0.1-incubating-branch/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jan 10 09:24:00 2011
@@ -1,2 +1,2 @@
 /incubator/lcf/branches/release-0.1-branch:1056045
-/incubator/lcf/trunk:1039159,1041674,1041679,1041763,1041885,1041968,1042383,1042836-1042837,1042896,1042898,1043728,1044276,1044287,1044294,1044641,1049834,1050183,1050605,1056045,1056054,1056104,1056116,1056131-1056139,1056157,1056170-1056173,1056195,1056245,1056382-1056385,1056474,1056738
+/incubator/lcf/trunk:1039159,1041674,1041679,1041763,1041885,1041968,1042383,1042836-1042837,1042896,1042898,1043728,1044276,1044287,1044294,1044641,1049834,1050183,1050605,1056045,1056054,1056104,1056116,1056131-1056139,1056157,1056170-1056173,1056195,1056245,1056382-1056385,1056474,1056738,1057076,1057120

Modified: incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/CHANGES.txt Mon Jan 10 09:24:00 2011
@@ -4,6 +4,15 @@ $Id$
 ======================= Release 0.1 =======================
 Release Date:  See http://incubator.apache.org/connectors for the official release date.
 
+CONNECTORS-148: Creating the database in PostgreSQL failed intermittently
+when a parameter was used for the encoding; the PostgreSQL documentation
+specifies that it must be a quoted string in that case.
+(Karl Wright)
+
+CONNECTORS-146: Problem with document cleanup logic would cause data corruption
+in carrydown data and in hopcount information.
+(Karl Wright)
+
 CONNECTORS-143: Copyright notice needs to be changed to 2011.
 (Sebb, Karl Wright)
 

Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfacePostgreSQL.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfacePostgreSQL.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfacePostgreSQL.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/core/src/main/java/org/apache/manifoldcf/core/database/DBInterfacePostgreSQL.java Mon Jan 10 09:24:00 2011
@@ -531,10 +531,8 @@ public class DBInterfacePostgreSQL exten
         null,null,null,true,-1,null,null);
       if (set.getRowCount() == 0)
       {
-        params.clear();
-        params.add("utf8");
-	masterDatabase.executeQuery("CREATE DATABASE "+databaseName+" OWNER="+
-	  userName+" ENCODING=?",params,null,invalidateKeys,null,false,0,null,null);
+	masterDatabase.executeQuery("CREATE DATABASE "+databaseName+" OWNER "+
+	  userName+" ENCODING 'utf8'",null,null,invalidateKeys,null,false,0,null,null);
       }
     }
     catch (ManifoldCFException e)

Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Mon Jan 10 09:24:00 2011
@@ -3,7 +3,7 @@
 /**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
+* this work for additional information regarding copyright ownership.f
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
@@ -158,6 +158,11 @@ public interface IJobManager
   public void resetDocDeleteWorkerStatus()
     throws ManifoldCFException;
 
+  /** Reset as part of restoring doc cleanup threads.
+  */
+  public void resetDocCleanupWorkerStatus()
+    throws ManifoldCFException;
+
   /** Reset as part of restoring startup threads.
   */
   public void resetStartupWorkerStatus()
@@ -365,6 +370,23 @@ public interface IJobManager
   public void resetDeletingDocument(DocumentDescription documentDescription)
     throws ManifoldCFException;
 
+  /** Reset a cleaning document back to its former state.
+  * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+  */
+  public void resetCleaningDocument(DocumentDescription documentDescription)
+    throws ManifoldCFException;
+
+  /** Reset a set of cleaning documents for further processing in the future.
+  * This method is called after some unknown number of the documents were cleaned, but then an ingestion service interruption occurred.
+  * Note well: The logic here basically presumes that we cannot know whether the documents were indeed cleaned or not.
+  * If we knew for a fact that none of the documents had been handled, it would be possible to look at the document's
+  * current status and decide what the new status ought to be, based on a true rollback scenario.  Such cases, however, are rare enough so that
+  * special logic is probably not worth it.
+  *@param documentDescriptions is the set of description objects for the document that was cleaned.
+  */
+  public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+    throws ManifoldCFException;
+
   /** Add an initial set of documents to the queue.
   * This method is called during job startup, when the queue is being loaded.
   * A set of document references is passed to this method, which updates the status of the document
@@ -727,6 +749,14 @@ public interface IJobManager
   public DocumentDescription[] getNextDeletableDocuments(int n)
     throws ManifoldCFException;
 
+  /** Get list of cleanable document descriptions.  This list will take into account
+  * multiple jobs that may own the same document.
+  *@param n is the maximum number of documents to return.
+  *@return the document descriptions for these documents.
+  */
+  public DocumentSetAndFlags getNextCleanableDocuments(int n)
+    throws ManifoldCFException;
+
   /** Delete ingested document identifiers (as part of deleting the owning job).
   * The number of identifiers specified is guaranteed to be less than the maxInClauseCount
   * for the database.
@@ -755,8 +785,12 @@ public interface IJobManager
   public void finishJobs()
     throws ManifoldCFException;
 
-  /** Reset eligible jobs back to "inactive" state.  This method is used to pick up all jobs in the shutting down state
-  * whose purgatory records have been all cleaned up.
+  /** Reset eligible jobs either back to the "inactive" state, or make them active again.  The
+  * latter will occur if the cleanup phase of the job generated more pending documents.
+  *
+  *  This method is used to pick up all jobs in the shutting down state
+  * whose purgatory or being-cleaned records have been all processed.
+  *
   *@param currentTime is the current time in milliseconds since epoch.
   *@param resetJobs is filled in with the set of IJobDescription objects that were reset.
   */

Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java Mon Jan 10 09:24:00 2011
@@ -506,7 +506,7 @@ public class HopCount extends org.apache
   }
 
   /** Remove a set of document identifiers specified as a criteria.  This will remove hopcount rows and
-  * also intrinsic links that have the specified document identifiers are sources.
+  * also intrinsic links that have the specified document identifiers as sources.
   */
   public void deleteMatchingDocuments(Long jobID, String[] legalLinkTypes,
     String sourceTableName,

Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Jan 10 09:24:00 2011
@@ -740,6 +740,16 @@ public class JobManager implements IJobM
     Logging.jobs.debug("Reset complete");
   }
 
+  /** Reset as part of restoring doc cleanup threads.
+  */
+  public void resetDocCleanupWorkerStatus()
+    throws ManifoldCFException
+  {
+    Logging.jobs.debug("Resetting doc cleaning status");
+    jobQueue.resetDocCleanupWorkerStatus();
+    Logging.jobs.debug("Reset complete");
+  }
+
   /** Reset as part of restoring startup threads.
   */
   public void resetStartupWorkerStatus()
@@ -765,6 +775,198 @@ public class JobManager implements IJobM
     // carrydown records get removed when the job itself is removed.
   }
 
+  /** Get list of cleanable document descriptions.  This list will take into account
+  * multiple jobs that may own the same document.  All documents for which a description
+  * is returned will be transitioned to the "beingcleaned" state.  Documents which are
+  * not in transition and are eligible, but are owned by other jobs, will have their
+  * jobqueue entries deleted by this method.
+  *@param maxCount is the maximum number of documents to return.
+  *@return the document descriptions for these documents.
+  */
+  public DocumentSetAndFlags getNextCleanableDocuments(int maxCount)
+    throws ManifoldCFException
+  {
+    // The query will be built here, because it joins the jobs table against the jobqueue
+    // table.
+    //
+    // This query must only pick up documents that are not active in any job and
+    // which belong to a job that's in a "shutting down" state and are in
+    // a "purgatory" state.
+    //
+    // We are in fact more conservative in this query than we need to be; the documents
+    // excluded will include some that simply match our criteria, which is designed to
+    // be fast rather than perfect.  The match we make is: hashvalue against hashvalue, and
+    // different job id's.
+    //
+    // SELECT id,jobid,docid FROM jobqueue t0 WHERE t0.status='P' AND EXISTS(SELECT 'x' FROM
+    //              jobs t3 WHERE t0.jobid=t3.id AND t3.status='X')
+    //      AND NOT EXISTS(SELECT 'x' FROM jobqueue t2 WHERE t0.hashval=t2.hashval AND t0.jobid!=t2.jobid
+    //              AND t2.status IN ('A','F','B'))
+    //
+
+    // Do a simple preliminary query, since the big query is currently slow, so that we don't waste time during stasis or
+    // ingestion.
+    // Moved outside of transaction, so we have no chance of locking up job status cache key for an extended period of time.
+    if (!jobs.cleaningJobsPresent())
+      return new DocumentSetAndFlags(new DocumentDescription[0],new boolean[0]);
+
+    long startTime = 0L;
+    if (Logging.perf.isDebugEnabled())
+    {
+      startTime = System.currentTimeMillis();
+      Logging.perf.debug("Waiting to find documents to put on the cleaning queue");
+    }
+
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        if (Logging.perf.isDebugEnabled())
+          Logging.perf.debug("After "+new Long(System.currentTimeMillis()-startTime).toString()+" ms, beginning query to look for documents to put on cleaning queue");
+
+        // Note: This query does not do "FOR UPDATE", because it is running under the only thread that can possibly change the document's state to "being cleaned".
+        ArrayList list = new ArrayList();
+        list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
+        
+        list.add(jobs.statusToString(jobs.STATUS_SHUTTINGDOWN));
+        
+        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
+        
+        IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
+          jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
+          jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+"=? "+
+          " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
+          " AND t1."+jobs.statusField+"=?"+
+          ") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
+          jobQueue.docHashField+" AND t0."+jobQueue.jobIDField+"!=t2."+jobQueue.jobIDField+
+          " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
+          list,null,null,maxCount,null);
+
+        if (Logging.perf.isDebugEnabled())
+          Logging.perf.debug("Done getting docs to cleaning queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+
+        // We need to organize the returned set by connection name, so that we can efficiently
+        // use  getUnindexableDocumentIdentifiers.
+        // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription
+        // objects.
+        HashMap connectionNameMap = new HashMap();
+        HashMap documentIDMap = new HashMap();
+        int i = 0;
+        while (i < set.getRowCount())
+        {
+          IResultRow row = set.getRow(i);
+          Long jobID = (Long)row.getValue(jobQueue.jobIDField);
+          String documentIDHash = (String)row.getValue(jobQueue.docHashField);
+          String documentID = (String)row.getValue(jobQueue.docIDField);
+          Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
+          Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
+          // Failtime is probably not useful in this context, but we'll bring it along for completeness
+          long failTime;
+          if (failTimeValue == null)
+            failTime = -1L;
+          else
+            failTime = failTimeValue.longValue();
+          int failCount;
+          if (failCountValue == null)
+            failCount = 0;
+          else
+            failCount = (int)failCountValue.longValue();
+          IJobDescription jobDesc = load(jobID);
+          String connectionName = jobDesc.getConnectionName();
+          DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
+            jobID,documentIDHash,documentID,failTime,failCount);
+          documentIDMap.put(documentIDHash,dd);
+          ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+          if (x == null)
+          {
+            // New entry needed
+            x = new ArrayList();
+            connectionNameMap.put(connectionName,x);
+          }
+          x.add(dd);
+          i++;
+        }
+
+        // For each bin, obtain a filtered answer, and enter all answers into a hash table.
+        // We'll then scan the result again to look up the right descriptions for return,
+        // and delete the ones that are owned multiply.
+        HashMap allowedDocIds = new HashMap();
+        Iterator iter = connectionNameMap.keySet().iterator();
+        while (iter.hasNext())
+        {
+          String connectionName = (String)iter.next();
+          ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+          // Do the filter query
+          DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+          int j = 0;
+          while (j < descriptions.length)
+          {
+            descriptions[j] = (DocumentDescription)x.get(j);
+            j++;
+          }
+          String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName);
+          j = 0;
+          while (j < docIDHashes.length)
+          {
+            String docIDHash = docIDHashes[j++];
+            allowedDocIds.put(docIDHash,docIDHash);
+          }
+        }
+
+        // Now, assemble a result, and change the state of the records accordingly
+        DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
+        boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+        i = 0;
+        iter = documentIDMap.keySet().iterator();
+        while (iter.hasNext())
+        {
+          String docIDHash = (String)iter.next();
+          DocumentDescription dd = (DocumentDescription)documentIDMap.get(docIDHash);
+          // Determine whether we can delete it from the index or not
+          rvalBoolean[i] = (allowedDocIds.get(docIDHash) != null);
+          // Set the record status to "being cleaned" and return it
+          rval[i++] = dd;
+          jobQueue.setCleaningStatus(dd.getID());
+        }
+
+        if (Logging.perf.isDebugEnabled())
+          Logging.perf.debug("Done pruning unindexable docs after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
+
+        return new DocumentSetAndFlags(rval,rvalBoolean);
+
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction finding deleteable docs: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
   /** Get list of deletable document descriptions.  This list will take into account
   * multiple jobs that may own the same document.  All documents for which a description
   * is returned will be transitioned to the "beingdeleted" state.  Documents which are
@@ -789,9 +991,8 @@ public class JobManager implements IJobM
     // be fast rather than perfect.  The match we make is: hashvalue against hashvalue, and
     // different job id's.
     //
-    // SELECT id,jobid,docid FROM jobqueue t0 WHERE ((t0.status IN ('C','P','G') AND EXISTS(SELECT 'x' FROM
-    //      jobs t1 WHERE t0.jobid=t1.id AND t1.status='D')) OR (t0.status='P' AND EXISTS(SELECT 'x' FROM
-    //              jobs t3 WHERE t0.jobid=t3.id AND t3.status='X')))
+    // SELECT id,jobid,docid FROM jobqueue t0 WHERE (t0.status IN ('C','P','G') AND EXISTS(SELECT 'x' FROM
+    //      jobs t1 WHERE t0.jobid=t1.id AND t1.status='D')
     //      AND NOT EXISTS(SELECT 'x' FROM jobqueue t2 WHERE t0.hashval=t2.hashval AND t0.jobid!=t2.jobid
     //              AND t2.status IN ('A','F','B'))
     //
@@ -827,27 +1028,21 @@ public class JobManager implements IJobM
         
         list.add(jobs.statusToString(jobs.STATUS_READYFORDELETE));
         
-        list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
-        
-        list.add(jobs.statusToString(jobs.STATUS_SHUTTINGDOWN));
-        
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVE));
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
         list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
         
         IResultSet set = database.performQuery("SELECT "+jobQueue.idField+","+jobQueue.jobIDField+","+jobQueue.docHashField+","+jobQueue.docIDField+","+
           jobQueue.failTimeField+","+jobQueue.failCountField+" FROM "+
-          jobQueue.getTableName()+" t0 WHERE ((t0."+jobQueue.statusField+" IN (?,?,?) "+
+          jobQueue.getTableName()+" t0 WHERE t0."+jobQueue.statusField+" IN (?,?,?) "+
           " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t1 WHERE t0."+jobQueue.jobIDField+"=t1."+jobs.idField+
           " AND t1."+jobs.statusField+"=?"+
-          ")) OR (t0."+jobQueue.statusField+"=?"+
-          " AND EXISTS(SELECT 'x' FROM "+jobs.getTableName()+" t3 WHERE t0."+jobQueue.jobIDField+"=t3."+jobs.idField+
-          " AND t3."+jobs.statusField+"=?"+
-          "))) AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
+          ") AND NOT EXISTS(SELECT 'x' FROM "+jobQueue.getTableName()+" t2 WHERE t0."+jobQueue.docHashField+"=t2."+
           jobQueue.docHashField+" AND t0."+jobQueue.jobIDField+"!=t2."+jobQueue.jobIDField+
-          " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
+          " AND t2."+jobQueue.statusField+" IN (?,?,?,?,?,?)) "+database.constructOffsetLimitClause(0,maxCount),
           list,null,null,maxCount,null);
 
         if (Logging.perf.isDebugEnabled())
@@ -1329,6 +1524,7 @@ public class JobManager implements IJobM
     list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
     list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
     list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+    list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
     
     StringBuffer sb = new StringBuffer("SELECT t0.");
     sb.append(jobQueue.idField).append(",t0.");
@@ -1346,7 +1542,7 @@ public class JobManager implements IJobM
     sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE t0.")
       .append(jobQueue.docHashField).append("=t2.").append(jobQueue.docHashField).append(" AND t0.")
       .append(jobQueue.jobIDField).append("!=t2.").append(jobQueue.jobIDField).append(" AND t2.")
-      .append(jobQueue.statusField).append(" IN (?,?,?,?,?))");
+      .append(jobQueue.statusField).append(" IN (?,?,?,?,?,?))");
     sb.append(" ").append(database.constructOffsetLimitClause(0,n));
 
     // Analyze jobqueue tables unconditionally, since it's become much more sensitive in 8.3 than it used to be.
@@ -1775,6 +1971,7 @@ public class JobManager implements IJobM
     list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCAN));
     list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
     list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+    list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
     
     sb.append("t0.").append(jobQueue.checkTimeField).append("<=? AND ");
     sb.append("(t0.").append(jobQueue.checkActionField).append(" IS NULL OR t0.").append(jobQueue.checkActionField)
@@ -1785,7 +1982,7 @@ public class JobManager implements IJobM
     sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.getTableName()).append(" t2 WHERE t0.")
       .append(jobQueue.docHashField).append("=t2.").append(jobQueue.docHashField).append(" AND t0.")
       .append(jobQueue.jobIDField).append("!=t2.").append(jobQueue.jobIDField).append(" AND t2.")
-      .append(jobQueue.statusField).append(" IN (?,?,?,?,?)) AND ");
+      .append(jobQueue.statusField).append(" IN (?,?,?,?,?,?)) AND ");
 
     // Prerequisite event clause: AND NOT EXISTS(SELECT 'x' FROM prereqevents t3,events t4 WHERE t3.ownerid=t0.id AND t3.name=t4.name)
     sb.append("NOT EXISTS(SELECT 'x' FROM ").append(jobQueue.prereqEventManager.getTableName()).append(" t3,").append(eventManager.getTableName()).append(" t4 WHERE t0.")
@@ -2487,6 +2684,97 @@ public class JobManager implements IJobM
     }
   }
 
+  /** Reset a set of cleaning documents for further processing in the future.
+  * This method is called after some unknown number of the documents were cleaned, but then an ingestion service interruption occurred.
+  * Note well: The logic here basically presumes that we cannot know whether the documents were indeed cleaned or not.
+  * If we knew for a fact that none of the documents had been handled, it would be possible to look at the document's
+  * current status and decide what the new status ought to be, based on a true rollback scenario.  Such cases, however, are rare enough so that
+  * special logic is probably not worth it.
+  *@param documentDescriptions is the set of description objects for the document that was cleaned.
+  */
+  public void resetCleaningDocumentMultiple(DocumentDescription[] documentDescriptions)
+    throws ManifoldCFException
+  {
+    Long[] ids = new Long[documentDescriptions.length];
+    String[] docIDHashes = new String[documentDescriptions.length];
+
+    // First loop maps document identifier back to an index.
+    HashMap indexMap = new HashMap();
+    int i = 0;
+    while (i < documentDescriptions.length)
+    {
+      docIDHashes[i] =documentDescriptions[i].getDocumentIdentifierHash() + ":" + documentDescriptions[i].getJobID();
+      indexMap.put(docIDHashes[i],new Integer(i));
+      i++;
+    }
+
+    // Sort!
+    java.util.Arrays.sort(docIDHashes);
+
+    // Next loop populates the actual arrays we use to feed the operation so that the ordering is correct.
+    i = 0;
+    while (i < docIDHashes.length)
+    {
+      String docIDHash = docIDHashes[i];
+      Integer x = (Integer)indexMap.remove(docIDHash);
+      if (x == null)
+        throw new ManifoldCFException("Assertion failure: duplicate document identifier jobid/hash detected!");
+      int index = x.intValue();
+      ids[i] = documentDescriptions[index].getID();
+      i++;
+    }
+
+    // Documents get marked PURGATORY regardless of their current state; this is because we can't know at this point what the actual prior state was.
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
+      {
+        // Going through ids in order should greatly reduce or eliminate chances of deadlock occurring.  We thus need to pay attention to the sorted order.
+        i = 0;
+        while (i < ids.length)
+        {
+          jobQueue.setUncleaningStatus(ids[i]);
+          i++;
+        }
+
+        break;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction resetting cleaning documents: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
+  /** Reset a cleaning document back to its former state.
+  * This gets done when a deleting thread sees a service interruption, etc., from the ingestion system.
+  */
+  public void resetCleaningDocument(DocumentDescription documentDescription)
+    throws ManifoldCFException
+  {
+    resetCleaningDocumentMultiple(new DocumentDescription[]{documentDescription});
+  }
+
   /** Reset a set of deleting documents for further processing in the future.
   * This method is called after some unknown number of the documents were deleted, but then an ingestion service interruption occurred.
   * Note well: The logic here basically presumes that we cannot know whether the documents were indeed processed or not.
@@ -5113,7 +5401,7 @@ public class JobManager implements IJobM
       // This method must find only jobs that have nothing hanging around in their jobqueue that represents an ingested
       // document.  Any jobqueue entries which are in a state to interfere with the delete will be cleaned up by other
       // threads, so eventually a job will become eligible.  This happens when there are no records that have an ingested
-      // status: complete, purgatory, being-deleted, or pending purgatory.
+      // status: complete, purgatory, being-cleaned, being-deleted, or pending purgatory.
       database.beginTransaction();
       try
       {
@@ -5152,12 +5440,15 @@ public class JobManager implements IJobM
           list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
           list.add(jobID);
           list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+          list.add(jobID);
+          list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
 
           IResultSet confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
             jobQueue.getTableName()+" WHERE "+
             "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
             "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
             "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
+            "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
             "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) "+database.constructOffsetLimitClause(0,1),list,null,null,1,null);
 
           if (confirmSet.getRowCount() > 0)
@@ -5438,8 +5729,12 @@ public class JobManager implements IJobM
     }
   }
 
-  /** Reset eligible jobs back to "inactive" state.  This method is used to pick up all jobs in the shutting down state
-  * whose purgatory or being-deleted records have been all cleaned up.
+  /** Reset eligible jobs either back to the "inactive" state, or make them active again.  The
+  * latter will occur if the cleanup phase of the job generated more pending documents.
+  *
+  *  This method is used to pick up all jobs in the shutting down state
+  * whose purgatory or being-cleaned records have been all processed.
+  *
   *@param currentTime is the current time in milliseconds since epoch.
   *@param resetJobs is filled in with the set of IJobDescription objects that were reset.
   */
@@ -5477,7 +5772,7 @@ public class JobManager implements IJobM
           list.add(jobID);
           list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
           list.add(jobID);
-          list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+          list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
 
           IResultSet confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
             jobQueue.getTableName()+" WHERE "+
@@ -5487,14 +5782,40 @@ public class JobManager implements IJobM
           if (confirmSet.getRowCount() > 0)
             continue;
 
-          IJobDescription jobDesc = jobs.load(jobID,true);
-          resetJobs.add(jobDesc);
-          
-          // Label the job "finished"
-          jobs.finishJob(jobID,currentTime);
-          if (Logging.jobs.isDebugEnabled())
+          // The shutting-down phase is complete.  However, we need to check if there are any outstanding
+          // PENDING or PENDINGPURGATORY records before we can decide what to do.
+          list.clear();
+          list.add(jobID);
+          list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
+          list.add(jobID);
+          list.add(jobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY));
+
+          confirmSet = database.performQuery("SELECT "+jobQueue.idField+" FROM "+
+            jobQueue.getTableName()+" WHERE "+
+            "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) OR "+
+            "("+jobQueue.jobIDField+"=? AND "+jobQueue.statusField+"=?) "+database.constructOffsetLimitClause(0,1),list,null,null,1,null);
+
+          if (confirmSet.getRowCount() > 0)
           {
-            Logging.jobs.debug("Job "+jobID+" now completed");
+            // This job needs to re-enter the active state.  Make that happen.
+            jobs.returnJobToActive(jobID);
+            if (Logging.jobs.isDebugEnabled())
+            {
+              Logging.jobs.debug("Job "+jobID+" is re-entering active state");
+            }
+          }
+          else
+          {
+            // This job should be marked as finished.
+            IJobDescription jobDesc = jobs.load(jobID,true);
+            resetJobs.add(jobDesc);
+            
+            // Label the job "finished"
+            jobs.finishJob(jobID,currentTime);
+            if (Logging.jobs.isDebugEnabled())
+            {
+              Logging.jobs.debug("Job "+jobID+" now completed");
+            }
           }
         }
         return;
@@ -5804,6 +6125,7 @@ public class JobManager implements IJobM
     list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
     list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
     list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+    list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
     
     list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
     list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
@@ -5857,6 +6179,7 @@ public class JobManager implements IJobM
       .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Processed'")
       .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Processed'")
       .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Being removed'")
+      .append(" WHEN ").append("t0.").append(jobQueue.statusField).append("=? THEN 'Being removed'")
       .append(" ELSE 'Unknown'")
       .append(" END AS state,")
       .append("CASE")
@@ -5982,6 +6305,7 @@ public class JobManager implements IJobM
     list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
     
     list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+    list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
     
     list.add(jobQueue.actionToString(jobQueue.ACTION_RESCAN));
     list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
@@ -6035,6 +6359,7 @@ public class JobManager implements IJobM
       .append("CASE")
       .append(" WHEN ")
       .append(jobQueue.statusField).append("=?")
+      .append(" OR ").append(jobQueue.statusField).append("=?")
       .append(" THEN 1 ELSE 0")
       .append(" END")
       .append(" as deleting,")
@@ -6177,9 +6502,10 @@ public class JobManager implements IJobM
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVEPURGATORY));
         list.add(jobQueue.statusToString(jobQueue.STATUS_ACTIVENEEDRESCANPURGATORY));
         list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
+        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
         list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
         list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
-        sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?,?,?,?,?)");
+        sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?,?,?,?,?,?)");
         break;
       }
       k++;
@@ -6221,7 +6547,8 @@ public class JobManager implements IJobM
         break;
       case DOCSTATUS_DELETING:
         list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGDELETED));
-        sb.append(fieldPrefix).append(jobQueue.statusField).append("=?");
+        list.add(jobQueue.statusToString(jobQueue.STATUS_BEINGCLEANED));
+        sb.append(fieldPrefix).append(jobQueue.statusField).append(" IN (?,?)");
         break;
       case DOCSTATUS_READYFORPROCESSING:
         list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));

Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Mon Jan 10 09:24:00 2011
@@ -45,6 +45,7 @@ public class JobQueue extends org.apache
   public final static int STATUS_BEINGDELETED = 6;
   public final static int STATUS_ACTIVENEEDRESCAN = 7;
   public final static int STATUS_ACTIVENEEDRESCANPURGATORY = 8;
+  public final static int STATUS_BEINGCLEANED = 9;
 
   // Action values
   public final static int ACTION_RESCAN = 0;
@@ -66,9 +67,12 @@ public class JobQueue extends org.apache
   // an aborted job.  On recovery, PENDING and ACTIVE records are deleted (since they were never
   // completed), while PENDINGPURGATORY and ACTIVEPURGATORY records are retained but get marked as PURGATORY.
   //
-  // BEINGDELETED means that the document is queued to be cleaned up because the owning job is being
+  // BEINGDELETED means that the document is queued because the owning job is being
   //   deleted.  It exists so that jobs that are active can avoid processing a document until the cleanup
   //   activity is done.
+  //
+  // BEINGCLEANED means that the document is queued because the owning job is in the SHUTTINGDOWN
+  //   state, and the document was never encountered during the crawl.
 
   // Field names
   public static final String idField = "id";
@@ -98,6 +102,7 @@ public class JobQueue extends org.apache
     statusMap.put("D",new Integer(STATUS_BEINGDELETED));
     statusMap.put("a",new Integer(STATUS_ACTIVENEEDRESCAN));
     statusMap.put("f",new Integer(STATUS_ACTIVENEEDRESCANPURGATORY));
+    statusMap.put("d",new Integer(STATUS_BEINGCLEANED));
   }
 
   protected static Map seedstatusMap;
@@ -307,6 +312,12 @@ public class JobQueue extends org.apache
     list.add(statusToString(STATUS_BEINGDELETED));
     performUpdate(map,"WHERE "+statusField+"=?",list,null);
 
+    // Map BEINGCLEANED to PURGATORY
+    map.put(statusField,statusToString(STATUS_PURGATORY));
+    list.clear();
+    list.add(statusToString(STATUS_BEINGCLEANED));
+    performUpdate(map,"WHERE "+statusField+"=?",list,null);
+
     // Map newseed fields to seed
     map.put(isSeedField,seedstatusToString(SEEDSTATUS_SEED));
     list.clear();
@@ -376,6 +387,20 @@ public class JobQueue extends org.apache
     performUpdate(map,"WHERE "+statusField+"=?",list,null);
   }
 
+  /** Reset doc cleaning worker status.
+  */
+  public void resetDocCleanupWorkerStatus()
+    throws ManifoldCFException
+  {
+    HashMap map = new HashMap();
+    ArrayList list = new ArrayList();
+    // Map BEINGCLEANED to PURGATORY
+    map.put(statusField,statusToString(STATUS_PURGATORY));
+    list.clear();
+    list.add(statusToString(STATUS_BEINGCLEANED));
+    performUpdate(map,"WHERE "+statusField+"=?",list,null);
+  }
+
   /** Prepare for a "full scan" job.  This will not be called
   * unless the job is in the "INACTIVE" state.
   * This does the following:
@@ -642,6 +667,34 @@ public class JobQueue extends org.apache
     performUpdate(map,"WHERE "+idField+"=?",list,null);
   }
 
+  /** Set the status of a document to "being cleaned".
+  */
+  public void setCleaningStatus(Long id)
+    throws ManifoldCFException
+  {
+    ArrayList list = new ArrayList();
+    list.add(id);
+    HashMap map = new HashMap();
+    map.put(statusField,statusToString(STATUS_BEINGCLEANED));
+    performUpdate(map,"WHERE "+idField+"=?",list,null);
+    noteModifications(0,1,0);
+  }
+
+  /** Set the status of a document to be "no longer cleaning" */
+  public void setUncleaningStatus(Long id)
+    throws ManifoldCFException
+  {
+    HashMap map = new HashMap();
+    map.put(statusField,statusToString(STATUS_PURGATORY));
+    map.put(checkTimeField,null);
+    map.put(checkActionField,null);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
+    ArrayList list = new ArrayList();
+    list.add(id);
+    performUpdate(map,"WHERE "+idField+"=?",list,null);
+  }
+
   /** Remove multiple records entirely.
   *@param ids is the set of job queue id's
   */
@@ -715,12 +768,14 @@ public class JobQueue extends org.apache
     case STATUS_ACTIVEPURGATORY:
     case STATUS_ACTIVENEEDRESCAN:
     case STATUS_ACTIVENEEDRESCANPURGATORY:
+    case STATUS_BEINGCLEANED:
+      // These are all the active states.  Being in this state implies that a thread may be working on the document.  We
+      // must not interrupt it.
       // Initial adds never bring along any carrydown info, so we should be satisfied as long as the record exists.
       break;
 
     case STATUS_COMPLETE:
     case STATUS_PURGATORY:
-    case STATUS_BEINGDELETED:
       // Set the status and time both
       map.put(statusField,statusToString(STATUS_PENDINGPURGATORY));
       if (desiredExecuteTime == -1L)
@@ -996,6 +1051,7 @@ public class JobQueue extends org.apache
       rval = true;
       break;
     case STATUS_COMPLETE:
+    case STATUS_BEINGCLEANED:
       // Requeue the document for processing, if there have been other changes.
       if (otherChangesSeen)
       {
@@ -1222,6 +1278,8 @@ public class JobQueue extends org.apache
       return "a";
     case STATUS_ACTIVENEEDRESCANPURGATORY:
       return "f";
+    case STATUS_BEINGCLEANED:
+      return "d";
     default:
       throw new ManifoldCFException("Bad status value: "+Integer.toString(status));
     }

Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Mon Jan 10 09:24:00 2011
@@ -1115,6 +1115,67 @@ public class Jobs extends org.apache.man
     performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
   }
 
+  /** Put job back into active state, from the shutting-down state.
+  *@param jobID is the job identifier.
+  */
+  public void returnJobToActive(Long jobID)
+    throws ManifoldCFException
+  {
+    beginTransaction();
+    try
+    {
+      ArrayList list = new ArrayList();
+      list.add(jobID);
+      IResultSet set = performQuery("SELECT "+statusField+","+connectionNameField+","+outputNameField+" FROM "+getTableName()+" WHERE "+
+        idField+"=? FOR UPDATE",list,null,null);
+      if (set.getRowCount() == 0)
+        throw new ManifoldCFException("Can't find job "+jobID.toString());
+      IResultRow row = set.getRow(0);
+      int status = stringToStatus((String)row.getValue(statusField));
+      int newStatus;
+      switch (status)
+      {
+      case STATUS_SHUTTINGDOWN:
+        if (connectionMgr.checkConnectorExists((String)row.getValue(connectionNameField)))
+        {
+          if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
+            newStatus = STATUS_ACTIVE;
+          else
+            newStatus = STATUS_ACTIVE_NOOUTPUT;
+        }
+        else
+        {
+          if (outputMgr.checkConnectorExists((String)row.getValue(outputNameField)))
+            newStatus = STATUS_ACTIVE_UNINSTALLED;
+          else
+            newStatus = STATUS_ACTIVE_NEITHER;
+        }
+        break;
+      default:
+        // Complain!
+        throw new ManifoldCFException("Unexpected job status encountered: "+Integer.toString(status));
+      }
+
+      HashMap map = new HashMap();
+      map.put(statusField,statusToString(newStatus));
+      performUpdate(map,"WHERE "+idField+"=?",list,new StringSet(getJobStatusKey()));
+    }
+    catch (ManifoldCFException e)
+    {
+      signalRollback();
+      throw e;
+    }
+    catch (Error e)
+    {
+      signalRollback();
+      throw e;
+    }
+    finally
+    {
+      endTransaction();
+    }
+  }
+  
   /** Make job active, and set the start time field.
   *@param jobID is the job identifier.
   *@param startTime is the current time in milliseconds from start of epoch.
@@ -1763,8 +1824,8 @@ public class Jobs extends org.apache.man
 
   }
 
-  /** Return true if there is a job in either the READYFORDELETE state or the
-  * SHUTTINGDOWN state.  (This matches the conditions for values to be returned from
+  /** Return true if there is a job in the READYFORDELETE state.  (This matches the
+  * conditions for values to be returned from
   * getNextDeletableDocuments).
   *@return true if such jobs exist.
   */
@@ -1773,9 +1834,24 @@ public class Jobs extends org.apache.man
   {
     ArrayList list = new ArrayList();
     list.add(statusToString(STATUS_READYFORDELETE));
+    IResultSet set = performQuery("SELECT "+idField+" FROM "+getTableName()+" WHERE "+
+      statusField+"=? "+constructOffsetLimitClause(0,1),
+      list,new StringSet(getJobStatusKey()),null,1);
+    return set.getRowCount() > 0;
+  }
+
+  /** Return true if there is a job in the
+  * SHUTTINGDOWN state.  (This matches the conditions for values to be returned from
+  * getNextCleanableDocuments).
+  *@return true if such jobs exist.
+  */
+  public boolean cleaningJobsPresent()
+    throws ManifoldCFException
+  {
+    ArrayList list = new ArrayList();
     list.add(statusToString(STATUS_SHUTTINGDOWN));
     IResultSet set = performQuery("SELECT "+idField+" FROM "+getTableName()+" WHERE "+
-      statusField+" IN (?,?) "+constructOffsetLimitClause(0,1),
+      statusField+"=? "+constructOffsetLimitClause(0,1),
       list,new StringSet(getJobStatusKey()),null,1);
     return set.getRowCount() > 0;
   }

Modified: incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1057136&r1=1057135&r2=1057136&view=diff
==============================================================================
--- incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ incubator/lcf/branches/release-0.1-incubating-branch/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Mon Jan 10 09:24:00 2011
@@ -46,6 +46,8 @@ public class ManifoldCF extends org.apac
   protected static ExpireThread[] expireThreads = null;
   protected static DocumentDeleteStufferThread deleteStufferThread = null;
   protected static DocumentDeleteThread[] deleteThreads = null;
+  protected static DocumentCleanupStufferThread cleanupStufferThread = null;
+  protected static DocumentCleanupThread[] cleanupThreads = null;
   protected static JobResetThread jobResetThread = null;
   protected static SeedingThread seedingThread = null;
   protected static IdleCleanupThread idleCleanupThread = null;
@@ -56,11 +58,15 @@ public class ManifoldCF extends org.apac
   protected static WorkerResetManager workerResetManager = null;
   /** Delete thread pool reset manager */
   protected static DocDeleteResetManager docDeleteResetManager = null;
+  /** Cleanup thread pool reset manager */
+  protected static DocCleanupResetManager docCleanupResetManager = null;
 
   // Number of worker threads
   protected static int numWorkerThreads = 0;
   // Number of delete threads
   protected static int numDeleteThreads = 0;
+  // Number of cleanup threads
+  protected static int numCleanupThreads = 0;
   // Number of expiration threads
   protected static int numExpireThreads = 0;
   // Factor for low water level in queueing
@@ -70,6 +76,7 @@ public class ManifoldCF extends org.apac
 
   protected static final String workerThreadCountProperty = "org.apache.manifoldcf.crawler.threads";
   protected static final String deleteThreadCountProperty = "org.apache.manifoldcf.crawler.deletethreads";
+  protected static final String cleanupThreadCountProperty = "org.apache.manifoldcf.crawler.cleanupthreads";
   protected static final String expireThreadCountProperty = "org.apache.manifoldcf.crawler.expirethreads";
   protected static final String lowWaterFactorProperty = "org.apache.manifoldcf.crawler.lowwaterfactor";
   protected static final String stuffAmtFactorProperty = "org.apache.manifoldcf.crawler.stuffamountfactor";
@@ -150,12 +157,18 @@ public class ManifoldCF extends org.apac
       String maxDeleteThreads = getProperty(deleteThreadCountProperty);
       if (maxDeleteThreads == null)
         maxDeleteThreads = "10";
+      String maxCleanupThreads = getProperty(cleanupThreadCountProperty);
+      if (maxCleanupThreads == null)
+        maxCleanupThreads = "10";
       String maxExpireThreads = getProperty(expireThreadCountProperty);
       if (maxExpireThreads == null)
         maxExpireThreads = "10";
       numDeleteThreads = new Integer(maxDeleteThreads).intValue();
       if (numDeleteThreads < 1 || numDeleteThreads > 300)
         throw new ManifoldCFException("Illegal value for the number of delete threads");
+      numCleanupThreads = new Integer(maxCleanupThreads).intValue();
+      if (numCleanupThreads < 1 || numCleanupThreads > 300)
+        throw new ManifoldCFException("Illegal value for the number of cleanup threads");
       numExpireThreads = new Integer(maxExpireThreads).intValue();
       if (numExpireThreads < 1 || numExpireThreads > 300)
         throw new ManifoldCFException("Illegal value for the number of expire threads");
@@ -180,12 +193,14 @@ public class ManifoldCF extends org.apac
 
       DocumentQueue documentQueue = new DocumentQueue();
       DocumentDeleteQueue documentDeleteQueue = new DocumentDeleteQueue();
+      DocumentCleanupQueue documentCleanupQueue = new DocumentCleanupQueue();
       DocumentDeleteQueue expireQueue = new DocumentDeleteQueue();
 
       BlockingDocuments blockingDocuments = new BlockingDocuments();
 
       workerResetManager = new WorkerResetManager(documentQueue);
       docDeleteResetManager = new DocDeleteResetManager(documentDeleteQueue);
+      docCleanupResetManager = new DocCleanupResetManager(documentCleanupQueue);
 
       jobStartThread = new JobStartThread();
       startupThread = new StartupThread(queueTracker);
@@ -220,6 +235,16 @@ public class ManifoldCF extends org.apac
         deleteThreads[i] = new DocumentDeleteThread(Integer.toString(i),documentDeleteQueue,docDeleteResetManager);
         i++;
       }
+      
+      cleanupStufferThread = new DocumentCleanupStufferThread(documentCleanupQueue,numCleanupThreads,docCleanupResetManager);
+      cleanupThreads = new DocumentCleanupThread[numCleanupThreads];
+      i = 0;
+      while (i < numCleanupThreads)
+      {
+        cleanupThreads[i] = new DocumentCleanupThread(Integer.toString(i),documentCleanupQueue,queueTracker,docCleanupResetManager);
+        i++;
+      }
+
       jobResetThread = new JobResetThread(queueTracker);
       seedingThread = new SeedingThread(queueTracker);
       idleCleanupThread = new IdleCleanupThread();
@@ -311,6 +336,14 @@ public class ManifoldCF extends org.apac
           i++;
         }
 
+        cleanupStufferThread.start();
+        i = 0;
+        while (i < numCleanupThreads)
+        {
+          cleanupThreads[i].start();
+          i++;
+        }
+
         deleteStufferThread.start();
         i = 0;
         while (i < numDeleteThreads)
@@ -351,6 +384,7 @@ public class ManifoldCF extends org.apac
       while (initializationThread != null || jobDeleteThread != null || startupThread != null || jobStartThread != null || stufferThread != null ||
         finisherThread != null || notificationThread != null || workerThreads != null || expireStufferThread != null | expireThreads != null ||
         deleteStufferThread != null || deleteThreads != null ||
+        cleanupStufferThread != null || cleanupThreads != null ||
         jobResetThread != null || seedingThread != null || idleCleanupThread != null || setPriorityThread != null)
       {
         // Send an interrupt to all threads that are still there.
@@ -412,6 +446,20 @@ public class ManifoldCF extends org.apac
               expireThread.interrupt();
           }
         }
+        if (cleanupStufferThread != null)
+        {
+          cleanupStufferThread.interrupt();
+        }
+        if (cleanupThreads != null)
+        {
+          int i = 0;
+          while (i < cleanupThreads.length)
+          {
+            Thread cleanupThread = cleanupThreads[i++];
+            if (cleanupThread != null)
+              cleanupThread.interrupt();
+          }
+        }
         if (deleteStufferThread != null)
         {
           deleteStufferThread.interrupt();
@@ -534,6 +582,31 @@ public class ManifoldCF extends org.apac
             expireThreads = null;
         }
 
+        if (cleanupStufferThread != null)
+        {
+          if (!cleanupStufferThread.isAlive())
+            cleanupStufferThread = null;
+        }
+        if (cleanupThreads != null)
+        {
+          int i = 0;
+          boolean isAlive = false;
+          while (i < cleanupThreads.length)
+          {
+            Thread cleanupThread = cleanupThreads[i];
+            if (cleanupThread != null)
+            {
+              if (!cleanupThread.isAlive())
+                cleanupThreads[i] = null;
+              else
+                isAlive = true;
+            }
+            i++;
+          }
+          if (!isAlive)
+            cleanupThreads = null;
+        }
+
         if (deleteStufferThread != null)
         {
           if (!deleteStufferThread.isAlive())