You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/10/16 16:26:15 UTC

svn commit: r1184833 - in /incubator/lcf/trunk: CHANGES.txt framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Author: kwright
Date: Sun Oct 16 14:26:14 2011
New Revision: 1184833

URL: http://svn.apache.org/viewvc?rev=1184833&view=rev
Log:
Fix for CONNECTORS-187.  Service interruptions in worker thread during document deletion now cause documents to be requeued.

Modified:
    incubator/lcf/trunk/CHANGES.txt
    incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: incubator/lcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/CHANGES.txt?rev=1184833&r1=1184832&r2=1184833&view=diff
==============================================================================
--- incubator/lcf/trunk/CHANGES.txt (original)
+++ incubator/lcf/trunk/CHANGES.txt Sun Oct 16 14:26:14 2011
@@ -3,6 +3,10 @@ $Id$
 
 ======================= 0.4-dev =====================
 
+CONNECTORS-187: Fix how the worker threads handle document deletion
+service interruptions.
+(Karl Wright
+
 CONNECTORS-271: Wiki connector now has end-user documentation.
 (Karl Wright)
 

Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1184833&r1=1184832&r2=1184833&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Sun Oct 16 14:26:14 2011
@@ -73,29 +73,29 @@ public class WorkerThread extends Thread
       IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
       IOutputConnectionManager outputMgr = OutputConnectionManagerFactory.make(threadContext);
 
-      ArrayList fetchList = new ArrayList();
-      HashMap versionMap = new HashMap();
-      ArrayList finishList = new ArrayList();
-      HashMap idHashIndexMap = new HashMap();
+      List<DocumentToProcess> fetchList = new ArrayList<DocumentToProcess>();
+      Map<String,String> versionMap = new HashMap<String,String>();
+      List<QueuedDocument> finishList = new ArrayList<QueuedDocument>();
+      Map<String,Integer> idHashIndexMap = new HashMap<String,Integer>();
 
       // This is the place we accumulate documents to be deleted via the ingester; each member is a
       // document ID string.
-      ArrayList ingesterDeleteList = new ArrayList();
-      ArrayList ingesterDeleteListUnhashed = new ArrayList();
+      List<String> ingesterDeleteList = new ArrayList<String>();
+      List<String> ingesterDeleteListUnhashed = new ArrayList<String>();
 
       // This is where we accumulate the document QueuedDocuments to be deleted from the job queue.
       // These MUST be cleaned up after the ingester delete is called, in order to guarantee
       // cleanup happens!  Also note that there will be items in here that are NOT in the ingester
       // delete list...
-      ArrayList jobmanagerDeleteList = new ArrayList();
+      List<QueuedDocument> jobmanagerDeleteList = new ArrayList<QueuedDocument>();
 
       // This is where we store document ID strings of documents that need to be noted as having
       // been checked.
-      ArrayList ingesterCheckList = new ArrayList();
+      List<String> ingesterCheckList = new ArrayList<String>();
 
       // Temporary list
-      ArrayList tempIDHashList = new ArrayList();
-      ArrayList tempIDList = new ArrayList();
+      List<String> tempIDHashList = new ArrayList<String>();
+      List<String> tempIDList = new ArrayList<String>();
 
       // Loop
       while (true)
@@ -224,7 +224,7 @@ public class WorkerThread extends Thread
                     if (results[z] == false)
                     {
                       // Chuck it
-                      currentVersions[((Integer)idHashIndexMap.get(currentDocIDHashArray[z])).intValue()] = null;
+                      currentVersions[idHashIndexMap.get(currentDocIDHashArray[z]).intValue()] = null;
                     }
                     else
                     {
@@ -239,8 +239,8 @@ public class WorkerThread extends Thread
                   z = 0;
                   while (z < currentDocIDHashArray.length)
                   {
-                    currentDocIDHashArray[z] = (String)tempIDHashList.get(z);
-                    currentDocIDArray[z] = (String)tempIDList.get(z);
+                    currentDocIDHashArray[z] = tempIDHashList.get(z);
+                    currentDocIDArray[z] = tempIDList.get(z);
                     z++;
                   }
                 }
@@ -254,7 +254,7 @@ public class WorkerThread extends Thread
                 while (z < oldVersionStringArray.length)
                 {
                   String idHashValue = currentDocIDHashArray[z];
-                  QueuedDocument qd = qds.getDocument(((Integer)idHashIndexMap.get(idHashValue)).intValue());
+                  QueuedDocument qd = qds.getDocument(idHashIndexMap.get(idHashValue).intValue());
                   DocumentIngestStatus dis = qd.getLastIngestedStatus();
                   if (dis == null)
                     oldVersionStringArray[z] = null;
@@ -540,11 +540,11 @@ public class WorkerThread extends Thread
                           i = 0;
                           while (i < fetchList.size())
                           {
-                            DocumentToProcess dToP = (DocumentToProcess)fetchList.get(i);
+                            DocumentToProcess dToP = fetchList.get(i);
                             DocumentDescription dd = dToP.getDocument().getDocumentDescription();
                             processIDs[i] = dd.getDocumentIdentifier();
                             processIDHashes[i] = dd.getDocumentIdentifierHash();
-                            versions[i] = (String)versionMap.get(dd.getDocumentIdentifierHash());
+                            versions[i] = versionMap.get(dd.getDocumentIdentifierHash());
                             scanOnly[i] = dToP.getScanOnly();
                             i++;
                           }
@@ -596,7 +596,7 @@ public class WorkerThread extends Thread
                             i = 0;
                             while (i < finishList.size())
                             {
-                              QueuedDocument qd = (QueuedDocument)finishList.get(i++);
+                              QueuedDocument qd = finishList.get(i++);
                               DocumentDescription dd = qd.getDocumentDescription();
                               if (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
                                 dd.getFailRetryCount() == 0)
@@ -647,7 +647,7 @@ public class WorkerThread extends Thread
                           while (i < checkIDs.length)
                           {
                             checkClasses[i] = connectionName;
-                            checkIDs[i] = (String)ingesterCheckList.get(i);
+                            checkIDs[i] = ingesterCheckList.get(i);
                             i++;
                           }
                           ingester.documentCheckMultiple(outputName,checkClasses,checkIDs,currentTime);
@@ -682,7 +682,7 @@ public class WorkerThread extends Thread
                               i = 0;
                               while (i < finishList.size())
                               {
-                                QueuedDocument qd = (QueuedDocument)finishList.get(i);
+                                QueuedDocument qd = finishList.get(i);
                                 recrawlDocs[i] = qd.getDocumentDescription();
                                 String documentID = recrawlDocs[i].getDocumentIdentifier();
 
@@ -747,12 +747,12 @@ public class WorkerThread extends Thread
                           case IJobDescription.TYPE_SPECIFIED:
                             {
                               // Separate the ones we actually finished from the ones we need to requeue because they were aborted
-                              ArrayList completedList = new ArrayList();
-                              ArrayList abortedList = new ArrayList();
+                              List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
+                              List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
                               i = 0;
                               while (i < finishList.size())
                               {
-                                QueuedDocument qd = (QueuedDocument)finishList.get(i++);
+                                QueuedDocument qd = finishList.get(i++);
                                 DocumentDescription dd = qd.getDocumentDescription();
                                 if (abortSet.get(dd.getDocumentIdentifier()) != null)
                                 {
@@ -775,7 +775,7 @@ public class WorkerThread extends Thread
                                 i = 0;
                                 while (i < docDescriptions.length)
                                 {
-                                  docDescriptions[i] = (DocumentDescription)abortedList.get(i);
+                                  docDescriptions[i] = abortedList.get(i);
                                   recheckTimeArray[i] = new Long(0L);
                                   actionArray[i] = IJobManager.ACTION_RESCAN;
                                   i++;
@@ -807,7 +807,7 @@ public class WorkerThread extends Thread
                           i = 0;
                           while (i < finishList.size())
                           {
-                            QueuedDocument qd = (QueuedDocument)finishList.get(i++);
+                            QueuedDocument qd = finishList.get(i++);
                             qd.setProcessed();
                           }
 
@@ -956,8 +956,8 @@ public class WorkerThread extends Thread
   *@param ingesterDeleteList is a list of document id's to delete.
   */
   protected static void processDeleteLists(String outputName, IRepositoryConnector connector,
-    IRepositoryConnection connection, IJobManager jobManager, ArrayList jobmanagerDeleteList,
-    IIncrementalIngester ingester, ArrayList ingesterDeleteList, ArrayList ingesterDeleteListUnhashed,
+    IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> jobmanagerDeleteList,
+    IIncrementalIngester ingester, List<String> ingesterDeleteList, List<String> ingesterDeleteListUnhashed,
     Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
     int hopcountMethod, QueueTracker queueTracker, long currentTime)
     throws ManifoldCFException
@@ -974,38 +974,60 @@ public class WorkerThread extends Thread
       while (i < ingesterDeleteList.size())
       {
         deleteClasses[i] = connectionName;
-        deleteIDs[i] = (String)ingesterDeleteList.get(i);
+        deleteIDs[i] = ingesterDeleteList.get(i);
         i++;
       }
-      // Delete retry loop: Keep trying until we succeed (if ever).
-      // This is reasonable behavior because a ServiceInterruption due to the ingestion API being down will block most progress anyhow;
-      // it's reasonable therefore to just keep waiting until service is restored.
-      while (true)
+      
+      // Try to delete the documents via the output connection.
+      try
       {
-        try
+        ingester.documentDeleteMultiple(outputName,deleteClasses,deleteIDs,ingestLogger);
+      }
+      catch (ServiceInterruption e)
+      {
+        // It looks like the output connection is not currently functioning, so we need to requeue instead of deleting
+        // those documents that could not be removed.
+        List<QueuedDocument> newJobmanagerDeleteList = new ArrayList<QueuedDocument>();
+        List<QueuedDocument> newRequeueList = new ArrayList<QueuedDocument>();
+        
+        Map<String,String> ingesterMap = new HashMap<String,String>();
+        for (int j = 0 ; j < ingesterDeleteList.size() ; j++)
         {
-          ingester.documentDeleteMultiple(outputName,deleteClasses,deleteIDs,ingestLogger);
-          break;
+          String id = ingesterDeleteList.get(j);
+          ingesterMap.put(id,id);
         }
-        catch (ServiceInterruption e)
+        for (int j = 0 ; j < jobmanagerDeleteList.size() ; j++)
         {
-          long amt = e.getRetryTime();
-          long now = System.currentTimeMillis();
-          long waittime = amt-now;
-          if (waittime <= 0L)
-            waittime = 300000L;
-          try
-          {
-            ManifoldCF.sleep(waittime);
-          }
-          catch (InterruptedException e2)
-          {
-            throw new ManifoldCFException("Interrupted: "+e2.getMessage(),e2,ManifoldCFException.INTERRUPTED);
-          }
+          QueuedDocument qd = jobmanagerDeleteList.get(j);
+          DocumentDescription dd = qd.getDocumentDescription();
+          String documentIdentifier = dd.getDocumentIdentifier();
+          if (ingesterMap.get(documentIdentifier) != null)
+            newRequeueList.add(qd);
+          else
+            newJobmanagerDeleteList.add(qd);
         }
+
+        // Requeue those that are supposed to be requeued
+        requeueDocuments(jobManager,newRequeueList,e.getRetryTime(),e.getFailTime(),
+          e.getFailRetryCount());
+        
+        // Process the ones that are just new job queue changes
+        jobmanagerDeleteList = newJobmanagerDeleteList;
       }
     }
-
+    
+    // Delete from the job queue
+    processJobQueueDeletions(jobmanagerDeleteList,connector,connection,
+      jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+  }
+  
+  /** Process job queue deletions.  Either the indexer has already been updated, or it is not necessary to update it.
+  */
+  protected static void processJobQueueDeletions(List<QueuedDocument> jobmanagerDeleteList,
+    IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager,
+    Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime)
+    throws ManifoldCFException
+  {
     // Now, do the document queue cleanup for deletions.
     if (jobmanagerDeleteList.size() > 0)
     {
@@ -1013,7 +1035,7 @@ public class WorkerThread extends Thread
       int i = 0;
       while (i < deleteDescriptions.length)
       {
-        QueuedDocument qd = (QueuedDocument)jobmanagerDeleteList.get(i);
+        QueuedDocument qd = jobmanagerDeleteList.get(i);
         deleteDescriptions[i] = qd.getDocumentDescription();
         i++;
       }
@@ -1028,7 +1050,7 @@ public class WorkerThread extends Thread
       i = 0;
       while (i < jobmanagerDeleteList.size())
       {
-        QueuedDocument qd = (QueuedDocument)jobmanagerDeleteList.get(i++);
+        QueuedDocument qd = jobmanagerDeleteList.get(i++);
         qd.setProcessed();
       }
     }
@@ -1041,7 +1063,7 @@ public class WorkerThread extends Thread
   *@param failTime is the time beyond which retries lead to hard failure.
   *@param failCount is the number of retries allowed until hard failure.
   */
-  protected static void requeueDocuments(IJobManager jobManager, ArrayList requeueList, long retryTime, long failTime, int failCount)
+  protected static void requeueDocuments(IJobManager jobManager, List<QueuedDocument> requeueList, long retryTime, long failTime, int failCount)
     throws ManifoldCFException
   {
     if (requeueList.size() > 0)
@@ -1051,7 +1073,7 @@ public class WorkerThread extends Thread
       int i = 0;
       while (i < requeueDocs.length)
       {
-        QueuedDocument qd = (QueuedDocument)requeueList.get(i);
+        QueuedDocument qd = requeueList.get(i);
         DocumentDescription dd = qd.getDocumentDescription();
         requeueDocs[i] = dd;
         i++;
@@ -1062,7 +1084,7 @@ public class WorkerThread extends Thread
       i = 0;
       while (i < requeueList.size())
       {
-        QueuedDocument qd = (QueuedDocument)requeueList.get(i++);
+        QueuedDocument qd = requeueList.get(i++);
         qd.setProcessed();
       }
     }