You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/08/13 02:44:06 UTC

svn commit: r1372225 [1/2] - in /manifoldcf/trunk: ./ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/ framework/pull-agent/src/main/java/org/apache/mani...

Author: kwright
Date: Mon Aug 13 00:44:05 2012
New Revision: 1372225

URL: http://svn.apache.org/viewvc?rev=1372225&view=rev
Log:
Fix for CONNECTORS-501.

Modified:
    manifoldcf/trunk/   (props changed)
    manifoldcf/trunk/CHANGES.txt
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
    manifoldcf/trunk/tests/filesystem/src/test/java/org/apache/manifoldcf/filesystem_tests/HopcountTester.java
    manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/BigCrawlTester.java
    manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/MockWebService.java

Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
  Merged /manifoldcf/branches/CONNECTORS-501:r1370450-1372223

Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Mon Aug 13 00:44:05 2012
@@ -3,6 +3,11 @@ $Id$
 
 ======================= 0.7-dev =====================
 
+CONNECTORS-501: Fix hopcount logic to return a deterministic
+number of documents.  A number of race conditions were discovered
+and corrected.
+(Shigeki Kobayashi, Karl Wright)
+
 CONNECTORS-497: Add Lists support to the SharePoint connector.
 (Ahmet Arslan, Karl Wright)
 

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Mon Aug 13 00:44:05 2012
@@ -274,9 +274,13 @@ public interface IJobManager
   public void markDocumentCompleted(DocumentDescription documentDescription)
     throws ManifoldCFException;
 
-  /** Note deletion as result of document processing by a job thread of a document.
+  /** Delete from queue as a result of processing of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  The RESCAN variants are interpreted
+  * as meaning that the document should not be deleted, but should instead be popped back on the queue for
+  * a repeat processing attempt.
   *@param documentDescriptions are the set of description objects for the documents that were processed.
-  *@param hopcountMethod is one of complete, partial, or nevercomplete.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
   *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
   *  to be requeued as a result of the change.
   */
@@ -284,9 +288,13 @@ public interface IJobManager
     int hopcountMethod)
     throws ManifoldCFException;
 
-  /** Note deletion as result of document processing by a job thread of a document.
+  /** Delete from queue as a result of processing of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  The RESCAN variants are interpreted
+  * as meaning that the document should not be deleted, but should instead be popped back on the queue for
+  * a repeat processing attempt.
   *@param documentDescription is the description object for the document that was processed.
-  *@param hopcountMethod is one of complete, partial, or nevercomplete.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
   *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
   *  to be requeued as a result of the change.
   */
@@ -294,6 +302,84 @@ public interface IJobManager
     int hopcountMethod)
     throws ManifoldCFException;
 
+  /** Mark hopcount removal from queue as a result of processing of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  The RESCAN variants are interpreted
+  * as meaning that the document should not be marked as removed, but should instead be popped back on the queue for
+  * a repeat processing attempt.
+  *@param documentDescriptions are the set of description objects for the documents that were processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentHopcountRemovalMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+    int hopcountMethod)
+    throws ManifoldCFException;
+
+  /** Mark hopcount removal from queue as a result of processing of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  The RESCAN variants are interpreted
+  * as meaning that the document should not be marked as removed, but should instead be popped back on the queue for
+  * a repeat processing attempt.
+  *@param documentDescription is the description object for the document that was processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentHopcountRemoval(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+    int hopcountMethod)
+    throws ManifoldCFException;
+
+  /** Delete from queue as a result of expiration of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  Since the document expired,
+  * no special activity takes place as a result of the document being in a RESCAN state.
+  *@param documentDescriptions are the set of description objects for the documents that were processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentExpiredMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+    int hopcountMethod)
+    throws ManifoldCFException;
+  
+  /** Delete from queue as a result of expiration of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  Since the document expired,
+  * no special activity takes place as a result of the document being in a RESCAN state.
+  *@param documentDescription is the description object for the document that was processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentExpired(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+    int hopcountMethod)
+    throws ManifoldCFException;
+
+  /** Delete from queue as a result of cleaning up an unreachable document.
+  * The document is expected to be in the PURGATORY state.  There is never any need to reprocess the
+  * document.
+  *@param documentDescriptions are the set of description objects for the documents that were processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentCleanedUpMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+    int hopcountMethod)
+    throws ManifoldCFException;
+
+  /** Delete from queue as a result of cleaning up an unreachable document.
+  * The document is expected to be in the PURGATORY state.  There is never any need to reprocess the
+  * document.
+  *@param documentDescription is the description object for the document that was processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentCleanedUp(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+    int hopcountMethod)
+    throws ManifoldCFException;
+
   /** Requeue a document set because of carrydown changes.
   * This method is called when carrydown data is modified for a set of documents.  The documents must be requeued for immediate reprocessing, even to the
   * extent that if one is *already* being processed, it will need to be done over again.

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java Mon Aug 13 00:44:05 2012
@@ -364,20 +364,20 @@ public class HopCount extends org.apache
 
   /** Record a reference from source to target.  This reference will be marked as "new" or "existing".
   */
-  public void recordReference(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String targetDocumentIDHash, String linkType,
+  public boolean recordReference(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String targetDocumentIDHash, String linkType,
     int hopcountMethod)
     throws ManifoldCFException
   {
-    doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,new String[]{targetDocumentIDHash},linkType,hopcountMethod);
+    return doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,new String[]{targetDocumentIDHash},linkType,hopcountMethod)[0];
   }
 
   /** Record a set of references from source to target.  This reference will be marked as "new" or "existing".
   */
-  public void recordReferences(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
+  public boolean[] recordReferences(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
     int hopcountMethod)
     throws ManifoldCFException
   {
-    doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,targetDocumentIDHashes,linkType,hopcountMethod);
+    return doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,targetDocumentIDHashes,linkType,hopcountMethod);
   }
 
   /** Complete a recalculation pass for a set of source documents.  All child links that are not marked as "new"
@@ -390,13 +390,19 @@ public class HopCount extends org.apache
   }
 
   /** Do the work of recording source-target references. */
-  protected void doRecord(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
+  protected boolean[] doRecord(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
     int hopcountMethod)
     throws ManifoldCFException
   {
 
     // We have to both add the reference, AND invalidate appropriate cached hopcounts (if it is a NEW
     // link.)
+    boolean[] rval = new boolean[targetDocumentIDHashes.length];
+    for (int i = 0; i < rval.length; i++)
+    {
+      rval[i] = false;
+    }
+    
     beginTransaction();
     try
     {
@@ -404,6 +410,19 @@ public class HopCount extends org.apache
       if (newReferences.length > 0)
       {
         // There are added links.
+        
+        // First, note them in return value
+        Set<String> newSet = new HashSet<String>();
+        for (int i = 0; i < newReferences.length; i++)
+        {
+          newSet.add(newReferences[i]);
+        }
+        for (int i = 0; i < rval.length; i++)
+        {
+          if (newSet.contains(targetDocumentIDHashes[i]) &&
+            (sourceDocumentIDHash==null || !sourceDocumentIDHash.equals(targetDocumentIDHashes[i])))
+            rval[i] = true;
+        }
 
         // The add causes hopcount records to be queued for processing (and created if they don't exist).
         // ALL the hopcount records for the target document ids must be queued, for all the link types
@@ -437,10 +456,9 @@ public class HopCount extends org.apache
 
         if (sourceDocumentIDHash == null || sourceDocumentIDHash.length() == 0)
         {
-          int i = 0;
-          while (i < estimates.length)
+          for (int i = 0; i < estimates.length; i++)
           {
-            estimates[i++] = new Answer(0);
+            estimates[i] = new Answer(0);
           }
         }
         else
@@ -459,19 +477,16 @@ public class HopCount extends org.apache
             new MultiClause(linkTypeField,legalLinkTypes)}));
 
           IResultSet set = performQuery(sb.toString(),list,null,null);
-          HashMap answerMap = new HashMap();
-          int i = 0;
-          while (i < estimates.length)
+          Map<String,Answer> answerMap = new HashMap<String,Answer>();
+          for (int i = 0; i < estimates.length; i++)
           {
             estimates[i] = new Answer(ANSWER_INFINITY);
             answerMap.put(legalLinkTypes[i],estimates[i]);
-            i++;
           }
 
-          i = 0;
-          while (i < set.getRowCount())
+          for (int i = 0; i < set.getRowCount(); i++)
           {
-            IResultRow row = set.getRow(i++);
+            IResultRow row = set.getRow(i);
             Long id = (Long)row.getValue(idField);
             DeleteDependency[] dds;
             if (hopcountMethod != IJobDescription.HOPCOUNT_NEVERDELETE)
@@ -480,7 +495,7 @@ public class HopCount extends org.apache
               dds = new DeleteDependency[0];
             Long distance = (Long)row.getValue(distanceField);
             String recordedLinkType = (String)row.getValue(linkTypeField);
-            Answer a = (Answer)answerMap.get(recordedLinkType);
+            Answer a = answerMap.get(recordedLinkType);
             int recordedDistance = (int)distance.longValue();
             if (recordedDistance != -1)
             {
@@ -495,6 +510,7 @@ public class HopCount extends org.apache
         if (Logging.hopcount.isDebugEnabled())
           Logging.hopcount.debug("Done queueing "+Integer.toString(targetDocumentIDHashes.length)+" documents");
       }
+      return rval;
     }
     catch (ManifoldCFException e)
     {
@@ -571,13 +587,11 @@ public class HopCount extends org.apache
       //
       // ... and then, re-evaluate all hopcount records and their dependencies that are marked for delete.
       //
-      // But, the trick is that both source and target links must go away!!  So deleting a document is very different than
-      // updating a link...
+
 
       // This also removes the links themselves...
       if (hopcountMethod == IJobDescription.HOPCOUNT_ACCURATE)
         doDeleteDocuments(jobID,documentHashes);
-      
 
     }
     catch (ManifoldCFException e)
@@ -733,7 +747,7 @@ public class HopCount extends org.apache
   
   /** Limited find for missing records.
   */
-  protected void performFindMissingRecords(Long jobID, String[] affectedLinkTypes, ArrayList list, Map matchMap)
+  protected void performFindMissingRecords(Long jobID, String[] affectedLinkTypes, ArrayList list, Map<Question,Long> matchMap)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
@@ -796,19 +810,16 @@ public class HopCount extends org.apache
     if (Logging.hopcount.isDebugEnabled())
     {
       Logging.hopcount.debug("Adding "+Integer.toString(documentIDHashes.length)+" documents to processing queue");
-      int z = 0;
-      while (z < documentIDHashes.length)
+      for (int z = 0; z < documentIDHashes.length; z++)
       {
-        Logging.hopcount.debug("  Adding '"+documentIDHashes[z++]+"' to processing queue");
+        Logging.hopcount.debug("  Adding '"+documentIDHashes[z]+"' to processing queue");
       }
       Logging.hopcount.debug("The source id is '"+sourceDocumentIDHash+"' and linktype is '"+linkType+"', and there are "+
         Integer.toString(affectedLinkTypes.length)+" affected link types, as below:");
-      z = 0;
-      while (z < affectedLinkTypes.length)
+      for (int z = 0; z < affectedLinkTypes.length; z++)
       {
         Logging.hopcount.debug("  Linktype '"+affectedLinkTypes[z]+"', current distance "+Integer.toString(startingAnswers[z].getAnswer())+" with "+
           Integer.toString(startingAnswers[z].countDeleteDependencies())+" delete dependencies.");
-        z++;
       }
     }
 
@@ -817,15 +828,13 @@ public class HopCount extends org.apache
     // so we can make sure they are added to the queue properly.
 
     // Make a map of the combinations of link type and document id we want to have present
-    HashMap matchMap = new HashMap();
+    Map<Question,Long> matchMap = new HashMap();
 
     // Make a map from the link type to the corresponding Answer object
-    HashMap answerMap = new HashMap();
-    int u = 0;
-    while (u < affectedLinkTypes.length)
+    Map<String,Answer> answerMap = new HashMap<String,Answer>();
+    for (int u = 0; u < affectedLinkTypes.length; u++)
     {
       answerMap.put(affectedLinkTypes[u],startingAnswers[u]);
-      u++;
     }
 
     // Do this in a transaction
@@ -838,9 +847,8 @@ public class HopCount extends org.apache
       int maxClause = maxClausePerformFindMissingRecords(jobID,affectedLinkTypes);
       ArrayList list = new ArrayList();
       
-      int i = 0;
       int k = 0;
-      while (i < documentIDHashes.length)
+      for (int i = 0; i < documentIDHashes.length; i++)
       {
         String documentIDHash = documentIDHashes[i];
         
@@ -853,7 +861,6 @@ public class HopCount extends org.apache
         
         list.add(documentIDHash);
         k++;
-        i++;
       }
       if (k > 0)
         performFindMissingRecords(jobID,affectedLinkTypes,list,matchMap);
@@ -864,12 +871,10 @@ public class HopCount extends org.apache
       // for queuing.
 
       HashMap map = new HashMap();
-      i = 0;
-      while (i < documentIDHashes.length)
+      for (int i = 0; i < documentIDHashes.length; i++)
       {
         String documentIDHash = documentIDHashes[i];
-        int j = 0;
-        while (j < affectedLinkTypes.length)
+        for (int j = 0; j < affectedLinkTypes.length; j++)
         {
           String affectedLinkType = affectedLinkTypes[j];
           Question q = new Question(documentIDHash,affectedLinkType);
@@ -927,9 +932,7 @@ public class HopCount extends org.apache
               matchMap.remove(q);
             }
           }
-          j++;
         }
-        i++;
       }
 
       // For all the records still in the matchmap, queue them.
@@ -943,26 +946,24 @@ public class HopCount extends org.apache
       StringBuilder sb = new StringBuilder();
       list = new ArrayList();
       k = 0;
-      i = 0;
-      while (k < documentIDHashes.length)
+      for (int i = 0; i < documentIDHashes.length; i++)
       {
-        String documentIDHash = documentIDHashes[k];
-        int j = 0;
-        while (j < affectedLinkTypes.length)
+        String documentIDHash = documentIDHashes[i];
+        for (int j = 0; j < affectedLinkTypes.length; j++)
         {
           String affectedLinkType = affectedLinkTypes[j];
 
           Question q = new Question(documentIDHash,affectedLinkType);
           if (matchMap.get(q) != null)
           {
-            if (i == maxClause)
+            if (k == maxClause)
             {
               performMarkAddDeps(sb.toString(),list);
-              i = 0;
+              k = 0;
               sb.setLength(0);
               list.clear();
             }
-            if (i > 0)
+            if (k > 0)
               sb.append(" OR ");
 
             // We only want to queue up hopcount records that correspond to the affected link types.
@@ -975,17 +976,17 @@ public class HopCount extends org.apache
             
             sb.append(buildConjunctionClause(list,new ClauseDescription[]{
               new UnitaryClause(jobIDField,jobID),
-              new UnitaryClause(markForDeathField,markToString(MARK_QUEUED)),
+              new MultiClause(markForDeathField,new Object[]{
+                markToString(MARK_NORMAL),
+                markToString(MARK_DELETING)}),
               new UnitaryClause(parentIDHashField,documentIDHash),
               new UnitaryClause(linkTypeField,affectedLinkType)}));
               
-            i++;
+            k++;
           }
-          j++;
         }
-        k++;
       }
-      if (i > 0)
+      if (k > 0)
         performMarkAddDeps(sb.toString(),list);
 
       // Leave the dependency records for the queued rows.  This will save lots of work if we decide not to
@@ -1064,7 +1065,7 @@ public class HopCount extends org.apache
 
   }
 
-  /** Invalidate links that start with or end in a specific set of documents, described by
+  /** Invalidate links that start with a specific set of documents, described by
   * a table join.
   */
   protected void doDeleteDocuments(Long jobID,
@@ -1121,39 +1122,7 @@ public class HopCount extends org.apache
     performUpdate(map,sb.toString(),list,null);
     noteModifications(0,1,0);
       
-      
-    sb = new StringBuilder("WHERE ");
-    list = new ArrayList();
-        
-    sb.append(idField).append(" IN(SELECT t0.").append(deleteDepsManager.ownerIDField).append(" FROM ")
-      .append(deleteDepsManager.getTableName()).append(" t0,").append(joinTableName).append(",")
-      .append(intrinsicLinkManager.getTableName()).append(" t1 WHERE ");
-
-    sb.append(buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause("t0."+deleteDepsManager.jobIDField,jobID)})).append(" AND ");
-
-    sb.append(buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause("t1."+intrinsicLinkManager.jobIDField,jobID),
-      new JoinClause("t1."+intrinsicLinkManager.parentIDHashField,"t0."+deleteDepsManager.parentIDHashField),
-      new JoinClause("t1."+intrinsicLinkManager.linkTypeField,"t0."+deleteDepsManager.linkTypeField),
-      new JoinClause("t1."+intrinsicLinkManager.childIDHashField,"t0."+deleteDepsManager.childIDHashField)})).append(" AND ");
-
-    sb.append(buildConjunctionClause(list,new ClauseDescription[]{
-      new UnitaryClause(joinTableJobColumn,jobID),
-      new JoinClause(joinTableIDColumn,"t0."+deleteDepsManager.parentIDHashField)})).append(" AND ");
-          
-    sb.append(joinTableCriteria);
-    list.addAll(joinTableParams);
-
-    sb.append(")");
-
-    map = new HashMap();
-    // These are whacked back to "infinity" to avoid infinite looping in a cut-off graph.
-    map.put(distanceField,new Long(-1L));
-    map.put(markForDeathField,markToString(MARK_DELETING));
-    performUpdate(map,sb.toString(),list,null);
-    noteModifications(0,1,0);
-
+    // We do NOT do the parentID because otherwise we have the potential to delete links that we need later.  See CONNECTORS-501.
 
     if (Logging.hopcount.isDebugEnabled())
       Logging.hopcount.debug("Done setting hopcount rows for job "+jobID+" to initial distances");
@@ -1186,7 +1155,7 @@ public class HopCount extends org.apache
 
   }
   
-  /** Invalidate links that start with or end in a specific set of documents.
+  /** Invalidate links that start with a specific set of documents.
   */
   protected void doDeleteDocuments(Long jobID,
     String[] documentHashes)
@@ -1307,28 +1276,8 @@ public class HopCount extends org.apache
     map.put(markForDeathField,markToString(MARK_DELETING));
     performUpdate(map,sb.toString(),thisList,null);
 
-    sb = new StringBuilder("WHERE ");
-    thisList = new ArrayList();
-
-    sb.append(idField).append(" IN(SELECT ").append(deleteDepsManager.ownerIDField).append(" FROM ").append(deleteDepsManager.getTableName()).append(" t0 WHERE ")
-      .append(buildConjunctionClause(thisList,new ClauseDescription[]{
-        new UnitaryClause("t0."+deleteDepsManager.jobIDField,jobID),
-        new MultiClause("t0."+deleteDepsManager.parentIDHashField,list)})).append(" AND ");
-        
-    sb.append("EXISTS(SELECT 'x' FROM ").append(intrinsicLinkManager.getTableName()).append(" t1 WHERE ")
-      .append(buildConjunctionClause(thisList,new ClauseDescription[]{
-        new JoinClause("t1."+intrinsicLinkManager.jobIDField,"t0."+deleteDepsManager.jobIDField),
-        new JoinClause("t1."+intrinsicLinkManager.linkTypeField,"t0."+deleteDepsManager.linkTypeField),
-        new JoinClause("t1."+intrinsicLinkManager.parentIDHashField,"t0."+deleteDepsManager.parentIDHashField),
-        new JoinClause("t1."+intrinsicLinkManager.childIDHashField,"t0."+deleteDepsManager.childIDHashField)}));
-
-    sb.append("))");
-        
-    map = new HashMap();
-    // These are whacked back to "infinity" to avoid infinite looping in a cut-off graph.
-    map.put(distanceField,new Long(-1L));
-    map.put(markForDeathField,markToString(MARK_DELETING));
-    performUpdate(map,sb.toString(),thisList,null);
+    // We do NOT do the parentID because we need to leave intrinsic links around that could be used again.
+    // See CONNECTORS-501.
   }
 
   /** Invalidate links meeting a simple criteria which have a given set of source documents.  This also runs a queue

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java Mon Aug 13 00:44:05 2012
@@ -354,24 +354,9 @@ public class IntrinsicLink extends org.a
               
       performDelete(sb.toString(),list,null);
       noteModifications(0,0,1);
-          
-      // Delete matches for parentIDHashField
-      sb = new StringBuilder("WHERE ");
-      list = new ArrayList();
-          
-      sb.append("EXISTS(SELECT 'x' FROM ").append(joinTableName).append(" WHERE ")
-        .append(buildConjunctionClause(list,new ClauseDescription[]{
-          new UnitaryClause(joinTableJobColumn,jobID),
-          new JoinClause(joinTableIDColumn,getTableName()+"."+parentIDHashField)})).append(" AND ");
-
-      sb.append(joinTableCriteria);
-      list.addAll(joinTableParams);
-              
-      sb.append(")");
-              
-      performDelete(sb.toString(),list,null);
-      noteModifications(0,0,1);
-
+      
+      // DON'T delete ParentID matches; we need to leave those around for bookkeeping to
+      // be correct.  See CONNECTORS-501.
     }
     catch (ManifoldCFException e)
     {
@@ -451,14 +436,8 @@ public class IntrinsicLink extends org.a
       new UnitaryClause(jobIDField,jobID),
       new MultiClause(childIDHashField,list)}));
     performDelete(sb.toString(),thisList,null);
-      
-    sb = new StringBuilder("WHERE ");
-    thisList = new ArrayList();
-
-    sb.append(buildConjunctionClause(thisList,new ClauseDescription[]{
-      new UnitaryClause(jobIDField,jobID),
-      new MultiClause(parentIDHashField,list)}));
-    performDelete(sb.toString(),thisList,null);
+    
+    // DON'T do parentID matches; we need to leave those around.  See CONNECTORS-501.
   }
 
   /** Remove all target links of the specified source documents that are not marked as "new" or "existing", and

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Aug 13 00:44:05 2012
@@ -1469,6 +1469,7 @@ public class JobManager implements IJobM
       .append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ")
       .append(database.buildConjunctionClause(list,new ClauseDescription[]{
         new MultiClause(jobQueue.statusField,new Object[]{
+          JobQueue.statusToString(jobQueue.STATUS_HOPCOUNTREMOVED),
           JobQueue.statusToString(jobQueue.STATUS_PENDING),
           JobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY)}),
         new UnitaryClause(jobQueue.prioritySetField,"<",new Long(currentTime))})).append(" AND ")
@@ -2474,7 +2475,11 @@ public class JobManager implements IJobM
     markDocumentCompletedMultiple(new DocumentDescription[]{documentDescription});
   }
 
-  /** Note deletion as result of document processing by a job thread of a document.
+  /** Delete from queue as a result of processing of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  The RESCAN variants are interpreted
+  * as meaning that the document should not be deleted, but should instead be popped back on the queue for
+  * a repeat processing attempt.
   *@param documentDescriptions are the set of description objects for the documents that were processed.
   *@param hopcountMethod describes how to handle deletions for hopcount purposes.
   *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
@@ -2484,6 +2489,237 @@ public class JobManager implements IJobM
     int hopcountMethod)
     throws ManifoldCFException
   {
+    // It's no longer an issue to have to deal with documents being conditionally deleted; that's been
+    // taken over by the hopcountremoval method below.  So just use the simple 'delete' functionality.
+    return doDeleteMultiple(jobID,legalLinkTypes,documentDescriptions,hopcountMethod);
+  }
+
+  /** Delete from queue as a result of processing of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  The RESCAN variants are interpreted
+  * as meaning that the document should not be deleted, but should instead be popped back on the queue for
+  * a repeat processing attempt.
+  *@param documentDescription is the description object for the document that was processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentDeleted(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+    int hopcountMethod)
+    throws ManifoldCFException
+  {
+    return markDocumentDeletedMultiple(jobID,legalLinkTypes,new DocumentDescription[]{documentDescription},hopcountMethod);
+  }
+
+  /** Mark hopcount removal from queue as a result of processing of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  The RESCAN variants are interpreted
+  * as meaning that the document should not be marked as removed, but should instead be popped back on the queue for
+  * a repeat processing attempt.
+  *@param documentDescriptions are the set of description objects for the documents that were processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentHopcountRemovalMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+    int hopcountMethod)
+    throws ManifoldCFException
+  {
+    // For each record, we're going to have to choose between marking it as "hopcount removed", and marking
+    // it for rescan.  So the basic flow will involve changing a document's status,.
+    
+    // Before we can change a document status, we need to know the *current* status.  Therefore, a SELECT xxx FOR UPDATE/UPDATE
+    // transaction is needed in order to complete these documents correctly.
+    //
+    // Since we are therefore setting row locks on thejobqueue table, we need to work to avoid unnecessary deadlocking.  To do that, we have to
+    // lock rows in document id hash order!!  Luckily, the DocumentDescription objects have a document identifier buried within, which we can use to
+    // order the "select for update" operations appropriately.
+    //
+
+    HashMap indexMap = new HashMap();
+    String[] docIDHashes = new String[documentDescriptions.length];
+
+    int i = 0;
+    while (i < documentDescriptions.length)
+    {
+      String documentIDHash = documentDescriptions[i].getDocumentIdentifierHash() + ":" + documentDescriptions[i].getJobID();
+      docIDHashes[i] = documentIDHash;
+      indexMap.put(documentIDHash,new Integer(i));
+      i++;
+    }
+
+    java.util.Arrays.sort(docIDHashes);
+
+    // Retry loop - in case we get a deadlock despite our best efforts
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction(database.TRANSACTION_SERIALIZED);
+      try
+      {
+        // Do one row at a time, to avoid deadlocking things
+        List<String> deleteList = new ArrayList<String>();
+        
+        i = 0;
+        while (i < docIDHashes.length)
+        {
+          String docIDHash = docIDHashes[i];
+
+          // Get the DocumentDescription object
+          DocumentDescription dd = documentDescriptions[((Integer)indexMap.get(docIDHash)).intValue()];
+
+          // Query for the status
+          ArrayList list = new ArrayList();
+          String query = database.buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(jobQueue.idField,dd.getID())});
+          IResultSet set = database.performQuery("SELECT "+jobQueue.statusField+" FROM "+jobQueue.getTableName()+" WHERE "+
+            query+" FOR UPDATE",list,null,null);
+          if (set.getRowCount() > 0)
+          {
+            IResultRow row = set.getRow(0);
+            // Grab the status
+            int status = jobQueue.stringToStatus((String)row.getValue(jobQueue.statusField));
+            // Update the jobqueue table
+            boolean didDelete = jobQueue.updateOrHopcountRemoveRecord(dd.getID(),status);
+            if (didDelete)
+            {
+              deleteList.add(dd.getDocumentIdentifierHash());
+            }
+          }
+          i++;
+        }
+        
+        String[] docIDSimpleHashes = new String[deleteList.size()];
+        for (int j = 0; j < docIDSimpleHashes.length; j++)
+        {
+          docIDSimpleHashes[j] = deleteList.get(j);
+        }
+        
+        // Next, find the documents that are affected by carrydown deletion.
+        DocumentDescription[] rval = calculateAffectedDeleteCarrydownChildren(jobID,docIDSimpleHashes);
+
+        // Since hopcount inheritance and prerequisites came from the addDocument() method,
+        // we don't delete them here.
+        
+        database.performCommit();
+        return rval;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction marking completed "+Integer.toString(docIDHashes.length)+
+            " docs: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+  }
+
+  /** Mark hopcount removal from queue as a result of processing of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  The RESCAN variants are interpreted
+  * as meaning that the document should not be marked as removed, but should instead be popped back on the queue for
+  * a repeat processing attempt.
+  *@param documentDescription is the description object for the document that was processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentHopcountRemoval(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+    int hopcountMethod)
+    throws ManifoldCFException
+  {
+    return markDocumentHopcountRemovalMultiple(jobID,legalLinkTypes,new DocumentDescription[]{documentDescription},hopcountMethod);
+  }
+
+  /** Delete from queue as a result of expiration of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  Since the document expired,
+  * no special activity takes place as a result of the document being in a RESCAN state.
+  *@param documentDescriptions are the set of description objects for the documents that were processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentExpiredMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+    int hopcountMethod)
+    throws ManifoldCFException
+  {
+    return doDeleteMultiple(jobID,legalLinkTypes,documentDescriptions,hopcountMethod);
+  }
+  
+  /** Delete from queue as a result of expiration of an active document.
+  * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+  * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN.  Since the document expired,
+  * no special activity takes place as a result of the document being in a RESCAN state.
+  *@param documentDescription is the description object for the document that was processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentExpired(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+    int hopcountMethod)
+    throws ManifoldCFException
+  {
+    return markDocumentExpiredMultiple(jobID,legalLinkTypes,new DocumentDescription[]{documentDescription},hopcountMethod);
+  }
+
+  /** Delete from queue as a result of cleaning up an unreachable document.
+  * The document is expected to be in the PURGATORY state.  There is never any need to reprocess the
+  * document.
+  *@param documentDescriptions are the set of description objects for the documents that were processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentCleanedUpMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+    int hopcountMethod)
+    throws ManifoldCFException
+  {
+    return doDeleteMultiple(jobID,legalLinkTypes,documentDescriptions,hopcountMethod);
+  }
+
+  /** Delete from queue as a result of cleaning up an unreachable document.
+  * The document is expected to be in the PURGATORY state.  There is never any need to reprocess the
+  * document.
+  *@param documentDescription is the description object for the document that was processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  public DocumentDescription[] markDocumentCleanedUp(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+    int hopcountMethod)
+    throws ManifoldCFException
+  {
+    return markDocumentCleanedUpMultiple(jobID,legalLinkTypes,new DocumentDescription[]{documentDescription},hopcountMethod);
+  }
+
+  /** Delete documents with no repercussions.  We don't have to worry about the current state of each document,
+  * since the document is definitely going away.
+  *@param documentDescriptions are the set of description objects for the documents that were processed.
+  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
+  *  to be requeued as a result of the change.
+  */
+  protected DocumentDescription[] doDeleteMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+    int hopcountMethod)
+    throws ManifoldCFException
+  {
     if (documentDescriptions.length == 0)
       return new DocumentDescription[0];
 
@@ -2684,18 +2920,6 @@ public class JobManager implements IJobM
     }
   }
 
-  /** Note deletion as result of document processing by a job thread of a document.
-  *@param documentDescription is the description object for the document that was processed.
-  *@param hopcountMethod describes how to handle deletions for hopcount purposes.
-  *@return the set of documents for which carrydown data was changed by this operation.  These documents are likely
-  *  to be requeued as a result of the change.
-  */
-  public DocumentDescription[] markDocumentDeleted(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
-    int hopcountMethod)
-    throws ManifoldCFException
-  {
-    return markDocumentDeletedMultiple(jobID,legalLinkTypes,new DocumentDescription[]{documentDescription},hopcountMethod);
-  }
 
 
   /** Requeue a document for further processing in the future.
@@ -3845,8 +4069,7 @@ public class JobManager implements IJobM
         // Go through document id's one at a time, in order - mainly to prevent deadlock as much as possible.  Search for any existing row in jobqueue first (for update)
         HashMap existingRows = new HashMap();
 
-        int z = 0;
-        while (z < reorderedDocIDHashes.length)
+        for (int z = 0; z < reorderedDocIDHashes.length; z++)
         {
           String docIDHash = reorderedDocIDHashes[z];
 
@@ -3885,17 +4108,22 @@ public class JobManager implements IJobM
             jobQueue.insertNewRecord(jobID,docIDHash,reorderedDocumentIdentifiers[z],reorderedDocumentPriorities[z],0L,currentTime,reorderedDocumentPrerequisites[z]);
           }
 
-          z++;
         }
 
         // Update all the carrydown data at once, for greatest efficiency.
         boolean[] carrydownChangesSeen = carryDown.recordCarrydownDataMultiple(jobID,parentIdentifierHash,reorderedDocIDHashes,dataNames,dataHashValues,dataValues);
 
+        // Same with hopcount.
+        boolean[] hopcountChangesSeen = null;
+        if (parentIdentifierHash != null && relationshipType != null)
+          hopcountChangesSeen = hopCount.recordReferences(jobID,legalLinkTypes,parentIdentifierHash,reorderedDocIDHashes,relationshipType,hopcountMethod);
+
         // Loop through the document id's again, and perform updates where needed
         boolean[] reorderedRval = new boolean[reorderedDocIDHashes.length];
 
-        z = 0;
-        while (z < reorderedDocIDHashes.length)
+        boolean reactivateRemovedHopcountRecords = false;
+        
+        for (int z = 0; z < reorderedDocIDHashes.length; z++)
         {
           String docIDHash = reorderedDocIDHashes[z];
           JobqueueRecord jr = (JobqueueRecord)existingRows.get(docIDHash);
@@ -3903,14 +4131,22 @@ public class JobManager implements IJobM
             // It was an insert
             reorderedRval[z] = true;
           else
+          {
             // It was an existing row; do the update logic
+            // The hopcountChangesSeen array describes whether each reference is a new one.  This
+            // helps us determine whether we're going to need to "flip" HOPCOUNTREMOVED documents
+            // to the PENDING state.  If the new link ended in an existing record, THEN we need to flip them all!
             reorderedRval[z] = jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
-            0L,currentTime,carrydownChangesSeen[z],reorderedDocumentPriorities[z],reorderedDocumentPrerequisites[z]);
-          z++;
+              0L,currentTime,carrydownChangesSeen[z] || (hopcountChangesSeen!=null && hopcountChangesSeen[z]),
+              reorderedDocumentPriorities[z],reorderedDocumentPrerequisites[z]);
+            // Signal if we need to perform the flip
+            if (hopcountChangesSeen != null && hopcountChangesSeen[z])
+              reactivateRemovedHopcountRecords = true;
+          }
         }
 
-        if (parentIdentifierHash != null && relationshipType != null)
-          hopCount.recordReferences(jobID,legalLinkTypes,parentIdentifierHash,reorderedDocIDHashes,relationshipType,hopcountMethod);
+        if (reactivateRemovedHopcountRecords)
+          jobQueue.reactivateHopcountRemovedRecords(jobID);
 
         database.performCommit();
         
@@ -5128,10 +5364,13 @@ public class JobManager implements IJobM
         if (legalLinkTypes.length > 0)
         {
           ArrayList list = new ArrayList();
-          list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
+          String query = database.buildConjunctionClause(list,new ClauseDescription[]{
+            new MultiClause("t99."+jobQueue.statusField,new Object[]{
+              jobQueue.statusToString(jobQueue.STATUS_PENDING),
+              jobQueue.statusToString(jobQueue.STATUS_HOPCOUNTREMOVED)})});
           hopCount.deleteMatchingDocuments(jobID,legalLinkTypes,jobQueue.getTableName()+" t99",
             "t99."+jobQueue.docHashField,"t99."+jobQueue.jobIDField,
-            "t99."+jobQueue.statusField+"=?",list,
+            query,list,
             hopcountMethod);
         }
 
@@ -6557,7 +6796,6 @@ public class JobManager implements IJobM
         IJobDescription jobDesc = jobs.load(jobID,true);
         resetJobs.add(jobDesc);
             
-        // Label the job "finished"
         jobs.finishJob(jobID,currentTime);
         if (Logging.jobs.isDebugEnabled())
         {
@@ -6567,7 +6805,7 @@ public class JobManager implements IJobM
     }
   }
 
-
+  
   // Status reports
 
   /** Get the status of a job.

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Mon Aug 13 00:44:05 2012
@@ -68,7 +68,7 @@ public class JobQueue extends org.apache
   public final static int STATUS_ACTIVENEEDRESCANPURGATORY = 8;
   public final static int STATUS_BEINGCLEANED = 9;
   public final static int STATUS_ELIGIBLEFORDELETE = 10;
-
+  public final static int STATUS_HOPCOUNTREMOVED = 11;
   // Action values
   public final static int ACTION_RESCAN = 0;
   public final static int ACTION_REMOVE = 1;
@@ -126,6 +126,7 @@ public class JobQueue extends org.apache
     statusMap.put("a",new Integer(STATUS_ACTIVENEEDRESCAN));
     statusMap.put("f",new Integer(STATUS_ACTIVENEEDRESCANPURGATORY));
     statusMap.put("d",new Integer(STATUS_BEINGCLEANED));
+    statusMap.put("H",new Integer(STATUS_HOPCOUNTREMOVED));
   }
 
   protected static Map seedstatusMap;
@@ -364,6 +365,35 @@ public class JobQueue extends org.apache
     unconditionallyAnalyzeTables();
   }
 
+  /** Flip all records for a job that have status HOPCOUNTREMOVED back to PENDING.
+  * NOTE: We need to actually schedule these!!!  so the following can't really work.  ???
+  */
+  public void reactivateHopcountRemovedRecords(Long jobID)
+    throws ManifoldCFException
+  {
+    Map map = new HashMap();
+    // Map HOPCOUNTREMOVED to PENDING
+    map.put(statusField,statusToString(STATUS_PENDING));
+    map.put(checkTimeField,new Long(0L));
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(jobIDField,jobID),
+      new UnitaryClause(statusField,statusToString(STATUS_HOPCOUNTREMOVED))});
+    performUpdate(map,"WHERE "+query,list,null);
+  }
+
+  /** Delete all records for a job that have status HOPCOUNTREMOVED.
+  */
+  public void deleteHopcountRemovedRecords(Long jobID)
+    throws ManifoldCFException
+  {
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(jobIDField,jobID),
+      new UnitaryClause(statusField,statusToString(STATUS_HOPCOUNTREMOVED))});
+    performDelete("WHERE "+query,list,null);
+  }
+
   /** Clear the failtimes for all documents associated with a job.
   * This method is called when the system detects that a significant delaying event has occurred,
   * and therefore the "failure clock" needs to be reset.
@@ -451,14 +481,19 @@ public class JobQueue extends org.apache
   {
     // Delete PENDING entries
     ArrayList list = new ArrayList();
-    list.add(jobID);
-    list.add(statusToString(STATUS_PENDING));
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause("t0."+jobIDField,jobID),
+      new MultiClause("t0."+statusField,new Object[]{
+        statusToString(STATUS_PENDING),
+        statusToString(STATUS_HOPCOUNTREMOVED)})});
     // Clean out prereqevents table first
-    prereqEventManager.deleteRows(getTableName()+" t0","t0."+idField,"t0."+jobIDField+"=? AND t0."+statusField+"=?",list);
+    prereqEventManager.deleteRows(getTableName()+" t0","t0."+idField,query,list);
     list.clear();
-    String query = buildConjunctionClause(list,new ClauseDescription[]{
+    query = buildConjunctionClause(list,new ClauseDescription[]{
       new UnitaryClause(jobIDField,jobID),
-      new UnitaryClause(statusField,statusToString(STATUS_PENDING))});
+      new MultiClause(statusField,new Object[]{
+        statusToString(STATUS_PENDING),
+        statusToString(STATUS_HOPCOUNTREMOVED)})});
     performDelete("WHERE "+query,list,null);
 
     // Turn PENDINGPURGATORY, PURGATORY, COMPLETED into ELIGIBLEFORDELETE.
@@ -687,6 +722,96 @@ public class JobQueue extends org.apache
     performUpdate(map,"WHERE "+query,list,null);
   }
 
+  /** Either delete a record, or set status to "rescan", depending on the
+  * record's state.
+  */
+  public boolean updateOrDeleteRecord(Long recID, int currentStatus)
+    throws ManifoldCFException
+  {
+    HashMap map = new HashMap();
+    
+    int newStatus;
+    String actionFieldValue;
+    Long checkTimeValue;
+    
+    switch (currentStatus)
+    {
+    case STATUS_ACTIVE:
+    case STATUS_ACTIVEPURGATORY:
+      // Delete it
+      deleteRecord(recID);
+      return true;
+    case STATUS_ACTIVENEEDRESCAN:
+    case STATUS_ACTIVENEEDRESCANPURGATORY:
+      newStatus = STATUS_PENDINGPURGATORY;
+      actionFieldValue = actionToString(ACTION_RESCAN);
+      checkTimeValue = new Long(0L);
+      // Leave doc priority unchanged.
+      break;
+    default:
+      throw new ManifoldCFException("Unexpected jobqueue status - record id "+recID.toString()+", expecting active status, saw "+Integer.toString(currentStatus));
+    }
+
+    map.put(statusField,statusToString(newStatus));
+    map.put(checkTimeField,checkTimeValue);
+    map.put(checkActionField,actionFieldValue);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(idField,recID)});
+    performUpdate(map,"WHERE "+query,list,null);
+    return false;
+  }
+
+  /** Either mark a record as hopcountremoved, or set status to "rescan", depending on the
+  * record's state.
+  */
+  public boolean updateOrHopcountRemoveRecord(Long recID, int currentStatus)
+    throws ManifoldCFException
+  {
+    HashMap map = new HashMap();
+    
+    int newStatus;
+    String actionFieldValue;
+    Long checkTimeValue;
+    
+    boolean rval;
+    
+    switch (currentStatus)
+    {
+    case STATUS_ACTIVE:
+    case STATUS_ACTIVEPURGATORY:
+      // Mark as hopcountremove
+      newStatus = STATUS_HOPCOUNTREMOVED;
+      actionFieldValue = actionToString(ACTION_RESCAN);
+      checkTimeValue = new Long(0L);
+      rval = true;
+      break;
+    case STATUS_ACTIVENEEDRESCAN:
+    case STATUS_ACTIVENEEDRESCANPURGATORY:
+      newStatus = STATUS_PENDINGPURGATORY;
+      actionFieldValue = actionToString(ACTION_RESCAN);
+      checkTimeValue = new Long(0L);
+      rval = false;
+      // Leave doc priority unchanged.
+      break;
+    default:
+      throw new ManifoldCFException("Unexpected jobqueue status - record id "+recID.toString()+", expecting active status, saw "+Integer.toString(currentStatus));
+    }
+
+    map.put(statusField,statusToString(newStatus));
+    map.put(checkTimeField,checkTimeValue);
+    map.put(checkActionField,actionFieldValue);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(idField,recID)});
+    performUpdate(map,"WHERE "+query,list,null);
+    return rval;
+  }
+
   /** Set the status to active on a record, leaving alone priority or check time.
   *@param id is the job queue id.
   *@param currentStatus is the current status
@@ -1159,10 +1284,11 @@ public class JobQueue extends org.apache
 
   /** Update an existing record (as the result of a reference add).
   * The record is presumed to exist and have been locked, via "FOR UPDATE".
+  *@return true if the document priority slot has been retained, false if freed.
   */
   public boolean updateExistingRecord(Long recordID, int currentStatus, Long checkTimeValue,
-    long desiredExecuteTime, long currentTime, boolean otherChangesSeen, double desiredPriority,
-    String[] prereqEvents)
+    long desiredExecuteTime, long currentTime, boolean otherChangesSeen,
+    double desiredPriority, String[] prereqEvents)
     throws ManifoldCFException
   {
     boolean rval = false;
@@ -1414,6 +1540,8 @@ public class JobQueue extends org.apache
       return "f";
     case STATUS_BEINGCLEANED:
       return "d";
+    case STATUS_HOPCOUNTREMOVED:
+      return "H";
     default:
       throw new ManifoldCFException("Bad status value: "+Integer.toString(status));
     }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Mon Aug 13 00:44:05 2012
@@ -232,7 +232,7 @@ public class DocumentCleanupThread exten
                   Long jobID = ddd.getJobID();
                   int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
                   String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
-                  DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
+                  DocumentDescription[] requeueCandidates = jobManager.markDocumentCleanedUp(jobID,legalLinkTypes,ddd,hopcountMethod);
                   // Use the common method for doing the requeuing
                   ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
                     connector,connection,queueTracker,currentTime);

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Mon Aug 13 00:44:05 2012
@@ -235,7 +235,7 @@ public class ExpireThread extends Thread
                   Long jobID = ddd.getJobID();
                   int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
                   String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
-                  DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
+                  DocumentDescription[] requeueCandidates = jobManager.markDocumentExpired(jobID,legalLinkTypes,ddd,hopcountMethod);
                   // Use the common method for doing the requeuing
                   ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
                     connector,connection,queueTracker,currentTime);