You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/01/11 15:30:02 UTC

svn commit: r1057659 - in /incubator/lcf/trunk: ./ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ framework/pull-agent/src/main/java/org/apache/manifol...

Author: kwright
Date: Tue Jan 11 14:30:01 2011
New Revision: 1057659

URL: http://svn.apache.org/viewvc?rev=1057659&view=rev
Log:
Fix for CONNECTORS-149.

Modified:
    incubator/lcf/trunk/CHANGES.txt
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerResetManager.java

Modified: incubator/lcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/CHANGES.txt?rev=1057659&r1=1057658&r2=1057659&view=diff
==============================================================================
--- incubator/lcf/trunk/CHANGES.txt (original)
+++ incubator/lcf/trunk/CHANGES.txt Tue Jan 11 14:30:01 2011
@@ -14,6 +14,12 @@ forget the index state for an output con
 ======================= Release 0.1 =======================
 Release Date:  See http://incubator.apache.org/connectors for the official release date.
 
+CONNECTORS-149: Expire threads did not obey rules as far as deleting documents
+belonging to other jobs.  Also, the test for whether a document was shared between
+jobs did not take the output connection into account.  Finally, I found an infinite
+loop in the job delete stuffer thread code.
+(Karl Wright)
+
 CONNECTORS-147: Disable PDF's everywhere except for user documentation.
 (Grant Ingersoll, David Crossley, Karl Wright)
 

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1057659&r1=1057658&r2=1057659&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Tue Jan 11 14:30:01 2011
@@ -210,7 +210,7 @@ public interface IJobManager
   *@param currentTime is the current time.
   *@return the array of document descriptions to expire.
   */
-  public DocumentDescription[] getExpiredDocuments(int n, long currentTime)
+  public DocumentSetAndFlags getExpiredDocuments(int n, long currentTime)
     throws ManifoldCFException;
 
   // This method supports the "queue stuffer" thread

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1057659&r1=1057658&r2=1057659&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Tue Jan 11 14:30:01 2011
@@ -852,7 +852,7 @@ public class JobManager implements IJobM
         if (Logging.perf.isDebugEnabled())
           Logging.perf.debug("Done getting docs to cleaning queue after "+new Long(System.currentTimeMillis()-startTime).toString()+" ms.");
 
-        // We need to organize the returned set by connection name, so that we can efficiently
+        // We need to organize the returned set by connection name and output connection name, so that we can efficiently
         // use  getUnindexableDocumentIdentifiers.
         // This is a table keyed by connection name and containing an ArrayList, which in turn contains DocumentDescription
         // objects.
@@ -880,15 +880,23 @@ public class JobManager implements IJobM
             failCount = (int)failCountValue.longValue();
           IJobDescription jobDesc = load(jobID);
           String connectionName = jobDesc.getConnectionName();
+          String outputConnectionName = jobDesc.getOutputConnectionName();
           DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
             jobID,documentIDHash,documentID,failTime,failCount);
-          documentIDMap.put(documentIDHash,dd);
-          ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+          String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
+          documentIDMap.put(compositeDocumentID,dd);
+          Map y = (Map)connectionNameMap.get(connectionName);
+          if (y == null)
+          {
+            y = new HashMap();
+            connectionNameMap.put(connectionName,y);
+          }
+          ArrayList x = (ArrayList)y.get(outputConnectionName);
           if (x == null)
           {
             // New entry needed
             x = new ArrayList();
-            connectionNameMap.put(connectionName,x);
+            y.put(outputConnectionName,x);
           }
           x.add(dd);
           i++;
@@ -902,35 +910,52 @@ public class JobManager implements IJobM
         while (iter.hasNext())
         {
           String connectionName = (String)iter.next();
-          ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
-          // Do the filter query
-          DocumentDescription[] descriptions = new DocumentDescription[x.size()];
-          int j = 0;
-          while (j < descriptions.length)
-          {
-            descriptions[j] = (DocumentDescription)x.get(j);
-            j++;
-          }
-          String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName);
-          j = 0;
-          while (j < docIDHashes.length)
-          {
-            String docIDHash = docIDHashes[j++];
-            allowedDocIds.put(docIDHash,docIDHash);
+          Map y = (Map)connectionNameMap.get(connectionName);
+          Iterator outputIter = y.keySet().iterator();
+          while (outputIter.hasNext())
+          {
+            String outputConnectionName = (String)outputIter.next();
+            ArrayList x = (ArrayList)y.get(outputConnectionName);
+            // Do the filter query
+            DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+            int j = 0;
+            while (j < descriptions.length)
+            {
+              descriptions[j] = (DocumentDescription)x.get(j);
+              j++;
+            }
+            String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
+            j = 0;
+            while (j < docIDHashes.length)
+            {
+              String docIDHash = docIDHashes[j++];
+              String key = makeCompositeID(docIDHash,connectionName);
+              allowedDocIds.put(key,docIDHash);
+            }
           }
         }
 
         // Now, assemble a result, and change the state of the records accordingly
-        DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
-        boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+        // First thing to do is order by document hash, so we reduce the risk of deadlock.
+        String[] compositeIDArray = new String[documentIDMap.size()];
         i = 0;
         iter = documentIDMap.keySet().iterator();
         while (iter.hasNext())
         {
-          String docIDHash = (String)iter.next();
-          DocumentDescription dd = (DocumentDescription)documentIDMap.get(docIDHash);
+          compositeIDArray[i++] = (String)iter.next();
+        }
+        
+        java.util.Arrays.sort(compositeIDArray);
+        
+        DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
+        boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+        i = 0;
+        while (i < compositeIDArray.length)
+        {
+          String compositeDocID = compositeIDArray[i];
+          DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID);
           // Determine whether we can delete it from the index or not
-          rvalBoolean[i] = (allowedDocIds.get(docIDHash) != null);
+          rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null);
           // Set the record status to "being cleaned" and return it
           rval[i++] = dd;
           jobQueue.setCleaningStatus(dd.getID());
@@ -967,6 +992,14 @@ public class JobManager implements IJobM
     }
   }
 
+  /** Create a composite document hash key.  This consists of the document id hash plus the
+  * connection name.
+  */
+  protected static String makeCompositeID(String docIDHash, String connectionName)
+  {
+    return docIDHash + ":" + connectionName;
+  }
+  
   /** Get list of deletable document descriptions.  This list will take into account
   * multiple jobs that may own the same document.  All documents for which a description
   * is returned will be transitioned to the "beingdeleted" state.  Documents which are
@@ -1076,15 +1109,23 @@ public class JobManager implements IJobM
             failCount = (int)failCountValue.longValue();
           IJobDescription jobDesc = load(jobID);
           String connectionName = jobDesc.getConnectionName();
+          String outputConnectionName = jobDesc.getOutputConnectionName();
           DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
             jobID,documentIDHash,documentID,failTime,failCount);
-          documentIDMap.put(documentIDHash,dd);
-          ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
+          String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
+          documentIDMap.put(compositeDocumentID,dd);
+          Map y = (Map)connectionNameMap.get(connectionName);
+          if (y == null)
+          {
+            y = new HashMap();
+            connectionNameMap.put(connectionName,y);
+          }
+          ArrayList x = (ArrayList)y.get(outputConnectionName);
           if (x == null)
           {
             // New entry needed
             x = new ArrayList();
-            connectionNameMap.put(connectionName,x);
+            y.put(outputConnectionName,x);
           }
           x.add(dd);
           i++;
@@ -1098,33 +1139,51 @@ public class JobManager implements IJobM
         while (iter.hasNext())
         {
           String connectionName = (String)iter.next();
-          ArrayList x = (ArrayList)connectionNameMap.get(connectionName);
-          // Do the filter query
-          DocumentDescription[] descriptions = new DocumentDescription[x.size()];
-          int j = 0;
-          while (j < descriptions.length)
-          {
-            descriptions[j] = (DocumentDescription)x.get(j);
-            j++;
-          }
-          String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName);
-          j = 0;
-          while (j < docIDHashes.length)
-          {
-            String docIDHash = docIDHashes[j++];
-            allowedDocIds.put(docIDHash,docIDHash);
+          Map y = (Map)connectionNameMap.get(connectionName);
+          Iterator outputIter = y.keySet().iterator();
+          while (outputIter.hasNext())
+          {
+            String outputConnectionName = (String)outputIter.next();
+            ArrayList x = (ArrayList)y.get(outputConnectionName);
+            // Do the filter query
+            DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+            int j = 0;
+            while (j < descriptions.length)
+            {
+              descriptions[j] = (DocumentDescription)x.get(j);
+              j++;
+            }
+            String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
+            j = 0;
+            while (j < docIDHashes.length)
+            {
+              String docIDHash = docIDHashes[j++];
+              String key = makeCompositeID(docIDHash,connectionName);
+              allowedDocIds.put(key,docIDHash);
+            }
           }
         }
 
         // Now, assemble a result, and change the state of the records accordingly
-        DocumentDescription[] rval = new DocumentDescription[allowedDocIds.size()];
+        // First thing to do is order by document hash to reduce chances of deadlock.
+        String[] compositeIDArray = new String[documentIDMap.size()];
         i = 0;
         iter = documentIDMap.keySet().iterator();
         while (iter.hasNext())
         {
-          String docIDHash = (String)iter.next();
-          DocumentDescription dd = (DocumentDescription)documentIDMap.get(docIDHash);
-          if (allowedDocIds.get(docIDHash) == null)
+          compositeIDArray[i++] = (String)iter.next();
+        }
+        
+        java.util.Arrays.sort(compositeIDArray);
+        
+        DocumentDescription[] rval = new DocumentDescription[allowedDocIds.size()];
+        int j = 0;
+        i = 0;
+        while (i < compositeIDArray.length)
+        {
+          String compositeDocumentID = compositeIDArray[i];
+          DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocumentID);
+          if (allowedDocIds.get(compositeDocumentID) == null)
           {
             // Delete this record and do NOT return it.
             jobQueue.deleteRecord(dd.getID());
@@ -1140,9 +1199,10 @@ public class JobManager implements IJobM
           else
           {
             // Set the record status to "being deleted" and return it
-            rval[i++] = dd;
+            rval[j++] = dd;
             jobQueue.setDeletingStatus(dd.getID());
           }
+          i++;
         }
 
         if (Logging.perf.isDebugEnabled())
@@ -1177,13 +1237,14 @@ public class JobManager implements IJobM
   }
 
   /** Get a list of document identifiers that should actually be deleted from the index, from a list that
-  * might contain identifiers that are shared with other jobs.  The input list is guaranteed to be
-  * smaller in size than maxInClauseCount for the database.
+  * might contain identifiers that are shared with other jobs, which are targeted to the same output connection.
+  * The input list is guaranteed to be smaller in size than maxInClauseCount for the database.
   *@param documentIdentifiers is the set of document identifiers to consider.
   *@param connectionName is the connection name for ALL the document identifiers.
+  *@param outputConnectionName is the output connection name for ALL the document identifiers.
   *@return the set of documents which should be removed from the index.
   */
-  protected String[] getUnindexableDocumentIdentifiers(DocumentDescription[] documentIdentifiers, String connectionName)
+  protected String[] getUnindexableDocumentIdentifiers(DocumentDescription[] documentIdentifiers, String connectionName, String outputConnectionName)
     throws ManifoldCFException
   {
     // This is where we will count the individual document id's
@@ -1233,8 +1294,9 @@ public class JobManager implements IJobM
     // will never be removed from the index.
     //
     // Instead, the only solution is to not queue a document for any activity that is inconsistent with activities
-    // that may already be ongoing for that document.  For this reason, I have introduced a "BEING_DELETED" state
-    // for a document.  This state will allow the various queries that queue up activities to avoid documents that
+    // that may already be ongoing for that document.  For this reason, I have introduced a "BEING_DELETED"
+    // and "BEING_CLEANED" state
+    // for a document.  These states will allow the various queries that queue up activities to avoid documents that
     // are currently being processed elsewhere.
 
     list.add(jobQueue.statusToString(jobQueue.STATUS_PURGATORY));
@@ -1242,11 +1304,13 @@ public class JobManager implements IJobM
     list.add(jobQueue.statusToString(jobQueue.STATUS_COMPLETE));
     
     list.add(connectionName);
+    list.add(outputConnectionName);
     
     sb.append(") AND t0.").append(jobQueue.statusField).append(" IN (?,?,?) AND EXISTS(SELECT 'x' FROM ")
       .append(jobs.getTableName()).append(" t1 WHERE t0.")
       .append(jobQueue.jobIDField).append("=t1.").append(jobs.idField).append(" AND t1.")
-      .append(jobs.connectionNameField).append("=?)");
+      .append(jobs.connectionNameField).append("=? AND t1.")
+      .append(jobs.outputNameField).append("=?)");
 
     // Do the query, and then count the number of times each document identifier occurs.
     IResultSet results = database.performQuery(sb.toString(),list,null,null);
@@ -1489,14 +1553,14 @@ public class JobManager implements IJobM
   *@param currentTime is the current time.
   *@return the array of document descriptions to expire.
   */
-  public DocumentDescription[] getExpiredDocuments(int n, long currentTime)
+  public DocumentSetAndFlags getExpiredDocuments(int n, long currentTime)
     throws ManifoldCFException
   {
     // Screening query
     // Moved outside of transaction, so there's less chance of keeping jobstatus cache key tied up
     // for an extended period of time.
     if (!jobs.activeJobsPresent())
-      return new DocumentDescription[0];
+      return new DocumentSetAndFlags(new DocumentDescription[0], new boolean[0]);
 
     long startTime = 0L;
     if (Logging.perf.isDebugEnabled())
@@ -1572,21 +1636,21 @@ public class JobManager implements IJobM
 
         // To avoid deadlock, we want to update the document id hashes in order.  This means reading into a structure I can sort by docid hash,
         // before updating any rows in jobqueue.
-        String[] docIDHashes = new String[set.getRowCount()];
-        Map storageMap = new HashMap();
+        HashMap connectionNameMap = new HashMap();
+        HashMap documentIDMap = new HashMap();
         Map statusMap = new HashMap();
 
         int i = 0;
         while (i < set.getRowCount())
         {
           IResultRow row = set.getRow(i);
-          Long id = (Long)row.getValue(jobQueue.idField);
           Long jobID = (Long)row.getValue(jobQueue.jobIDField);
-          String docIDHash = (String)row.getValue(jobQueue.docHashField);
-          String docID = (String)row.getValue(jobQueue.docIDField);
+          String documentIDHash = (String)row.getValue(jobQueue.docHashField);
+          String documentID = (String)row.getValue(jobQueue.docIDField);
           int status = jobQueue.stringToStatus(row.getValue(jobQueue.statusField).toString());
           Long failTimeValue = (Long)row.getValue(jobQueue.failTimeField);
           Long failCountValue = (Long)row.getValue(jobQueue.failCountField);
+          // Failtime is probably not useful in this context, but we'll bring it along for completeness
           long failTime;
           if (failTimeValue == null)
             failTime = -1L;
@@ -1594,35 +1658,95 @@ public class JobManager implements IJobM
             failTime = failTimeValue.longValue();
           int failCount;
           if (failCountValue == null)
-            failCount = -1;
+            failCount = 0;
           else
             failCount = (int)failCountValue.longValue();
-
-          DocumentDescription dd = new DocumentDescription(id,jobID,docIDHash,docID,failTime,failCount);
-          docIDHashes[i] = docIDHash + ":" +jobID;
-          storageMap.put(docIDHashes[i],dd);
-          statusMap.put(docIDHashes[i],new Integer(status));
+          IJobDescription jobDesc = load(jobID);
+          String connectionName = jobDesc.getConnectionName();
+          String outputConnectionName = jobDesc.getOutputConnectionName();
+          DocumentDescription dd = new DocumentDescription((Long)row.getValue(jobQueue.idField),
+            jobID,documentIDHash,documentID,failTime,failCount);
+          String compositeDocumentID = makeCompositeID(documentIDHash,connectionName);
+          documentIDMap.put(compositeDocumentID,dd);
+          statusMap.put(compositeDocumentID,new Integer(status));
+          Map y = (Map)connectionNameMap.get(connectionName);
+          if (y == null)
+          {
+            y = new HashMap();
+            connectionNameMap.put(connectionName,y);
+          }
+          ArrayList x = (ArrayList)y.get(outputConnectionName);
+          if (x == null)
+          {
+            // New entry needed
+            x = new ArrayList();
+            y.put(outputConnectionName,x);
+          }
+          x.add(dd);
           i++;
         }
 
-        // No duplicates are possible here
-        java.util.Arrays.sort(docIDHashes);
+        // For each bin, obtain a filtered answer, and enter all answers into a hash table.
+        // We'll then scan the result again to look up the right descriptions for return,
+        // and delete the ones that are owned multiply.
+        HashMap allowedDocIds = new HashMap();
+        Iterator iter = connectionNameMap.keySet().iterator();
+        while (iter.hasNext())
+        {
+          String connectionName = (String)iter.next();
+          Map y = (Map)connectionNameMap.get(connectionName);
+          Iterator outputIter = y.keySet().iterator();
+          while (outputIter.hasNext())
+          {
+            String outputConnectionName = (String)outputIter.next();
+            ArrayList x = (ArrayList)y.get(outputConnectionName);
+            // Do the filter query
+            DocumentDescription[] descriptions = new DocumentDescription[x.size()];
+            int j = 0;
+            while (j < descriptions.length)
+            {
+              descriptions[j] = (DocumentDescription)x.get(j);
+              j++;
+            }
+            String[] docIDHashes = getUnindexableDocumentIdentifiers(descriptions,connectionName,outputConnectionName);
+            j = 0;
+            while (j < docIDHashes.length)
+            {
+              String docIDHash = docIDHashes[j++];
+              String key = makeCompositeID(docIDHash,connectionName);
+              allowedDocIds.put(key,docIDHash);
+            }
+          }
+        }
 
+        // Now, assemble a result, and change the state of the records accordingly
+        // First thing to do is order by document hash, so we reduce the risk of deadlock.
+        String[] compositeIDArray = new String[documentIDMap.size()];
         i = 0;
-        while (i < docIDHashes.length)
+        iter = documentIDMap.keySet().iterator();
+        while (iter.hasNext())
         {
-          String docIDHash = docIDHashes[i];
-          DocumentDescription dd = (DocumentDescription)storageMap.get(docIDHash);
-          Long id = dd.getID();
-          int status = ((Integer)statusMap.get(docIDHash)).intValue();
-
-          // Set status to "ACTIVE".
-          jobQueue.updateActiveRecord(id,status);
+          compositeIDArray[i++] = (String)iter.next();
+        }
+        
+        java.util.Arrays.sort(compositeIDArray);
+        
+        DocumentDescription[] rval = new DocumentDescription[documentIDMap.size()];
+        boolean[] rvalBoolean = new boolean[documentIDMap.size()];
+        i = 0;
+        while (i < compositeIDArray.length)
+        {
+          String compositeDocID = compositeIDArray[i];
+          DocumentDescription dd = (DocumentDescription)documentIDMap.get(compositeDocID);
+          // Determine whether we can delete it from the index or not
+          rvalBoolean[i] = (allowedDocIds.get(compositeDocID) != null);
+          // Set the record status to "being cleaned" and return it
+          rval[i++] = dd;
+          jobQueue.updateActiveRecord(dd.getID(),((Integer)statusMap.get(compositeDocID)).intValue());
+        }
 
-          answers.add(dd);
+        return new DocumentSetAndFlags(rval, rvalBoolean);
 
-          i++;
-        }
       }
       catch (ManifoldCFException e)
       {
@@ -1647,14 +1771,6 @@ public class JobManager implements IJobM
         sleepFor(sleepAmt);
       }
 
-      DocumentDescription[] rval = new DocumentDescription[answers.size()];
-      int k = 0;
-      while (k < rval.length)
-      {
-        rval[k] = (DocumentDescription)answers.get(k);
-        k++;
-      }
-      return rval;
     }
   }
 

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java?rev=1057659&r1=1057658&r2=1057659&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireStufferThread.java Tue Jan 11 14:30:01 2011
@@ -34,7 +34,7 @@ public class ExpireStufferThread extends
 
   // Local data
   // This is a reference to the static main document expiration queue
-  protected DocumentDeleteQueue documentQueue;
+  protected DocumentCleanupQueue documentQueue;
   /** Worker thread pool reset manager */
   protected WorkerResetManager resetManager;
   // This is the number of entries we want to stuff at any one time.
@@ -45,7 +45,7 @@ public class ExpireStufferThread extends
   *@param n represents the number of threads that will be processing queued stuff, NOT the
   * number of documents to be done at once!
   */
-  public ExpireStufferThread(DocumentDeleteQueue documentQueue, int n, WorkerResetManager resetManager)
+  public ExpireStufferThread(DocumentCleanupQueue documentQueue, int n, WorkerResetManager resetManager)
     throws ManifoldCFException
   {
     super();
@@ -115,8 +115,10 @@ public class ExpireStufferThread extends
           // The number n passed in here thus cannot be used in a query to limit the number of returned
           // results.  Instead, it must be factored into the limit portion of the query.
           long currentTime = System.currentTimeMillis();
-          DocumentDescription[] descs = jobManager.getExpiredDocuments(deleteChunkSize,currentTime);
-
+          DocumentSetAndFlags docsAndFlags = jobManager.getExpiredDocuments(deleteChunkSize,currentTime);
+          DocumentDescription[] descs = docsAndFlags.getDocumentSet();
+          boolean[] deleteFromIndex = docsAndFlags.getFlags();
+          
           if (Thread.currentThread().isInterrupted())
             throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
 
@@ -134,14 +136,14 @@ public class ExpireStufferThread extends
           }
 
           // Do the stuffing
-          DeleteQueuedDocument[] docDescs = new DeleteQueuedDocument[descs.length];
+          CleanupQueuedDocument[] docDescs = new CleanupQueuedDocument[descs.length];
           int k = 0;
           while (k < docDescs.length)
           {
-            docDescs[k] = new DeleteQueuedDocument(descs[k]);
+            docDescs[k] = new CleanupQueuedDocument(descs[k],deleteFromIndex[k]);
             k++;
           }
-          DocumentDeleteSet set = new DocumentDeleteSet(docDescs);
+          DocumentCleanupSet set = new DocumentCleanupSet(docDescs);
           documentQueue.addDocuments(set);
 
           // If we don't wait here, the other threads don't seem to have a chance to queue anything else up.

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1057659&r1=1057658&r2=1057659&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Tue Jan 11 14:30:01 2011
@@ -35,7 +35,7 @@ public class ExpireThread extends Thread
   // Local data
   protected String id;
   // This is a reference to the static main document queue
-  protected DocumentDeleteQueue documentQueue;
+  protected DocumentCleanupQueue documentQueue;
   /** Worker thread pool reset manager */
   protected WorkerResetManager resetManager;
   /** Queue tracker */
@@ -44,7 +44,7 @@ public class ExpireThread extends Thread
   /** Constructor.
   *@param id is the expire thread id.
   */
-  public ExpireThread(String id, DocumentDeleteQueue documentQueue, QueueTracker queueTracker, WorkerResetManager resetManager)
+  public ExpireThread(String id, DocumentCleanupQueue documentQueue, QueueTracker queueTracker, WorkerResetManager resetManager)
     throws ManifoldCFException
   {
     super();
@@ -86,7 +86,7 @@ public class ExpireThread extends Thread
           // we update its status, even if there is an exception!!!
 
           // See if there is anything on the queue for me
-          DocumentDeleteSet dds = documentQueue.getDocuments();
+          DocumentCleanupSet dds = documentQueue.getDocuments();
           if (dds == null)
             // It's a reset, so recycle
             continue;
@@ -104,7 +104,7 @@ public class ExpireThread extends Thread
             int j = 0;
             while (j < dds.getCount())
             {
-              DeleteQueuedDocument dqd = dds.getDocument(j++);
+              CleanupQueuedDocument dqd = dds.getDocument(j++);
               DocumentDescription ddd = dqd.getDocumentDescription();
               Long jobID = ddd.getJobID();
               IJobDescription job = jobManager.load(jobID,true);
@@ -138,7 +138,7 @@ public class ExpireThread extends Thread
               j = 0;
               while (j < list.size())
               {
-                DeleteQueuedDocument dqd = (DeleteQueuedDocument)list.get(j);
+                CleanupQueuedDocument dqd = (CleanupQueuedDocument)list.get(j);
                 DocumentDescription ddd = dqd.getDocumentDescription();
                 Long jobID = ddd.getJobID();
                 IJobDescription job = jobManager.load(jobID,true);
@@ -188,16 +188,32 @@ public class ExpireThread extends Thread
                   {
                     String outputConnectionName = (String)outputIterator.next();
                     ArrayList indexList = (ArrayList)outputMap.get(outputConnectionName);
-                    String[] docClassesToRemove = new String[indexList.size()];
-                    String[] hashedDocsToRemove = new String[indexList.size()];
+                    // Count the number of docs to actually delete.  This will be a subset of the documents in the list.
+                    int k = 0;
+                    int removeCount = 0;
+                    while (k < indexList.size())
+                    {
+                      int index = ((Integer)indexList.get(k++)).intValue();
+                      if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
+                        removeCount++;
+                    }
+                    
+                    // Allocate removal arrays
+                    String[] docClassesToRemove = new String[removeCount];
+                    String[] hashedDocsToRemove = new String[removeCount];
 
                     // Now, iterate over the index list
-                    int k = 0;
+                    k = 0;
+                    removeCount = 0;
                     while (k < indexList.size())
                     {
                       int index = ((Integer)indexList.get(k)).intValue();
-                      docClassesToRemove[k] = (String)arrayDocClasses.get(index);
-                      hashedDocsToRemove[k] = (String)arrayDocHashes.get(index);
+                      if (((CleanupQueuedDocument)arrayDocsToDelete.get(index)).shouldBeRemovedFromIndex())
+                      {
+                        docClassesToRemove[removeCount] = (String)arrayDocClasses.get(index);
+                        hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(index);
+                        removeCount++;
+                      }
                       k++;
                     }
 
@@ -235,7 +251,7 @@ public class ExpireThread extends Thread
                     {
                       int index = ((Integer)indexList.get(k)).intValue();
 
-                      DeleteQueuedDocument dqd = (DeleteQueuedDocument)arrayDocsToDelete.get(index);
+                      CleanupQueuedDocument dqd = (CleanupQueuedDocument)arrayDocsToDelete.get(index);
                       DocumentDescription ddd = dqd.getDocumentDescription();
                       Long jobID = ddd.getJobID();
                       int hopcountMethod = ((Integer)hopcountMethods.get(index)).intValue();
@@ -287,7 +303,7 @@ public class ExpireThread extends Thread
             int j = 0;
             while (j < dds.getCount())
             {
-              DeleteQueuedDocument dqd = dds.getDocument(j);
+              CleanupQueuedDocument dqd = dds.getDocument(j);
               if (dqd.wasProcessed() == false)
               {
                 DocumentDescription ddd = dqd.getDocumentDescription();

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1057659&r1=1057658&r2=1057659&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Tue Jan 11 14:30:01 2011
@@ -194,11 +194,11 @@ public class ManifoldCF extends org.apac
       DocumentQueue documentQueue = new DocumentQueue();
       DocumentDeleteQueue documentDeleteQueue = new DocumentDeleteQueue();
       DocumentCleanupQueue documentCleanupQueue = new DocumentCleanupQueue();
-      DocumentDeleteQueue expireQueue = new DocumentDeleteQueue();
+      DocumentCleanupQueue expireQueue = new DocumentCleanupQueue();
 
       BlockingDocuments blockingDocuments = new BlockingDocuments();
 
-      workerResetManager = new WorkerResetManager(documentQueue);
+      workerResetManager = new WorkerResetManager(documentQueue,expireQueue);
       docDeleteResetManager = new DocDeleteResetManager(documentDeleteQueue);
       docCleanupResetManager = new DocCleanupResetManager(documentCleanupQueue);
 

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerResetManager.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerResetManager.java?rev=1057659&r1=1057658&r2=1057659&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerResetManager.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerResetManager.java Tue Jan 11 14:30:01 2011
@@ -32,12 +32,15 @@ public class WorkerResetManager extends 
 
   /** The document queue */
   protected DocumentQueue dq;
+  /** The expiration queue */
+  protected DocumentCleanupQueue eq;
 
   /** Constructor. */
-  public WorkerResetManager(DocumentQueue dq)
+  public WorkerResetManager(DocumentQueue dq, DocumentCleanupQueue eq)
   {
     super();
     this.dq = dq;
+    this.eq = eq;
   }
 
   /** Reset */
@@ -47,6 +50,7 @@ public class WorkerResetManager extends 
     IJobManager jobManager = JobManagerFactory.make(tc);
     jobManager.resetDocumentWorkerStatus();
     dq.clear();
+    eq.clear();
   }
 }