You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2011/10/16 16:26:15 UTC
svn commit: r1184833 - in /incubator/lcf/trunk: CHANGES.txt
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Author: kwright
Date: Sun Oct 16 14:26:14 2011
New Revision: 1184833
URL: http://svn.apache.org/viewvc?rev=1184833&view=rev
Log:
Fix for CONNECTORS-187. Service interruptions in worker thread during document deletion now cause documents to be requeued.
Modified:
incubator/lcf/trunk/CHANGES.txt
incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Modified: incubator/lcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/CHANGES.txt?rev=1184833&r1=1184832&r2=1184833&view=diff
==============================================================================
--- incubator/lcf/trunk/CHANGES.txt (original)
+++ incubator/lcf/trunk/CHANGES.txt Sun Oct 16 14:26:14 2011
@@ -3,6 +3,10 @@ $Id$
======================= 0.4-dev =====================
+CONNECTORS-187: Fix how the worker threads handle document deletion
+service interruptions.
+(Karl Wright
+
CONNECTORS-271: Wiki connector now has end-user documentation.
(Karl Wright)
Modified: incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1184833&r1=1184832&r2=1184833&view=diff
==============================================================================
--- incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ incubator/lcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Sun Oct 16 14:26:14 2011
@@ -73,29 +73,29 @@ public class WorkerThread extends Thread
IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
IOutputConnectionManager outputMgr = OutputConnectionManagerFactory.make(threadContext);
- ArrayList fetchList = new ArrayList();
- HashMap versionMap = new HashMap();
- ArrayList finishList = new ArrayList();
- HashMap idHashIndexMap = new HashMap();
+ List<DocumentToProcess> fetchList = new ArrayList<DocumentToProcess>();
+ Map<String,String> versionMap = new HashMap<String,String>();
+ List<QueuedDocument> finishList = new ArrayList<QueuedDocument>();
+ Map<String,Integer> idHashIndexMap = new HashMap<String,Integer>();
// This is the place we accumulate documents to be deleted via the ingester; each member is a
// document ID string.
- ArrayList ingesterDeleteList = new ArrayList();
- ArrayList ingesterDeleteListUnhashed = new ArrayList();
+ List<String> ingesterDeleteList = new ArrayList<String>();
+ List<String> ingesterDeleteListUnhashed = new ArrayList<String>();
// This is where we accumulate the document QueuedDocuments to be deleted from the job queue.
// These MUST be cleaned up after the ingester delete is called, in order to guarantee
// cleanup happens! Also note that there will be items in here that are NOT in the ingester
// delete list...
- ArrayList jobmanagerDeleteList = new ArrayList();
+ List<QueuedDocument> jobmanagerDeleteList = new ArrayList<QueuedDocument>();
// This is where we store document ID strings of documents that need to be noted as having
// been checked.
- ArrayList ingesterCheckList = new ArrayList();
+ List<String> ingesterCheckList = new ArrayList<String>();
// Temporary list
- ArrayList tempIDHashList = new ArrayList();
- ArrayList tempIDList = new ArrayList();
+ List<String> tempIDHashList = new ArrayList<String>();
+ List<String> tempIDList = new ArrayList<String>();
// Loop
while (true)
@@ -224,7 +224,7 @@ public class WorkerThread extends Thread
if (results[z] == false)
{
// Chuck it
- currentVersions[((Integer)idHashIndexMap.get(currentDocIDHashArray[z])).intValue()] = null;
+ currentVersions[idHashIndexMap.get(currentDocIDHashArray[z]).intValue()] = null;
}
else
{
@@ -239,8 +239,8 @@ public class WorkerThread extends Thread
z = 0;
while (z < currentDocIDHashArray.length)
{
- currentDocIDHashArray[z] = (String)tempIDHashList.get(z);
- currentDocIDArray[z] = (String)tempIDList.get(z);
+ currentDocIDHashArray[z] = tempIDHashList.get(z);
+ currentDocIDArray[z] = tempIDList.get(z);
z++;
}
}
@@ -254,7 +254,7 @@ public class WorkerThread extends Thread
while (z < oldVersionStringArray.length)
{
String idHashValue = currentDocIDHashArray[z];
- QueuedDocument qd = qds.getDocument(((Integer)idHashIndexMap.get(idHashValue)).intValue());
+ QueuedDocument qd = qds.getDocument(idHashIndexMap.get(idHashValue).intValue());
DocumentIngestStatus dis = qd.getLastIngestedStatus();
if (dis == null)
oldVersionStringArray[z] = null;
@@ -540,11 +540,11 @@ public class WorkerThread extends Thread
i = 0;
while (i < fetchList.size())
{
- DocumentToProcess dToP = (DocumentToProcess)fetchList.get(i);
+ DocumentToProcess dToP = fetchList.get(i);
DocumentDescription dd = dToP.getDocument().getDocumentDescription();
processIDs[i] = dd.getDocumentIdentifier();
processIDHashes[i] = dd.getDocumentIdentifierHash();
- versions[i] = (String)versionMap.get(dd.getDocumentIdentifierHash());
+ versions[i] = versionMap.get(dd.getDocumentIdentifierHash());
scanOnly[i] = dToP.getScanOnly();
i++;
}
@@ -596,7 +596,7 @@ public class WorkerThread extends Thread
i = 0;
while (i < finishList.size())
{
- QueuedDocument qd = (QueuedDocument)finishList.get(i++);
+ QueuedDocument qd = finishList.get(i++);
DocumentDescription dd = qd.getDocumentDescription();
if (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
dd.getFailRetryCount() == 0)
@@ -647,7 +647,7 @@ public class WorkerThread extends Thread
while (i < checkIDs.length)
{
checkClasses[i] = connectionName;
- checkIDs[i] = (String)ingesterCheckList.get(i);
+ checkIDs[i] = ingesterCheckList.get(i);
i++;
}
ingester.documentCheckMultiple(outputName,checkClasses,checkIDs,currentTime);
@@ -682,7 +682,7 @@ public class WorkerThread extends Thread
i = 0;
while (i < finishList.size())
{
- QueuedDocument qd = (QueuedDocument)finishList.get(i);
+ QueuedDocument qd = finishList.get(i);
recrawlDocs[i] = qd.getDocumentDescription();
String documentID = recrawlDocs[i].getDocumentIdentifier();
@@ -747,12 +747,12 @@ public class WorkerThread extends Thread
case IJobDescription.TYPE_SPECIFIED:
{
// Separate the ones we actually finished from the ones we need to requeue because they were aborted
- ArrayList completedList = new ArrayList();
- ArrayList abortedList = new ArrayList();
+ List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
+ List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
i = 0;
while (i < finishList.size())
{
- QueuedDocument qd = (QueuedDocument)finishList.get(i++);
+ QueuedDocument qd = finishList.get(i++);
DocumentDescription dd = qd.getDocumentDescription();
if (abortSet.get(dd.getDocumentIdentifier()) != null)
{
@@ -775,7 +775,7 @@ public class WorkerThread extends Thread
i = 0;
while (i < docDescriptions.length)
{
- docDescriptions[i] = (DocumentDescription)abortedList.get(i);
+ docDescriptions[i] = abortedList.get(i);
recheckTimeArray[i] = new Long(0L);
actionArray[i] = IJobManager.ACTION_RESCAN;
i++;
@@ -807,7 +807,7 @@ public class WorkerThread extends Thread
i = 0;
while (i < finishList.size())
{
- QueuedDocument qd = (QueuedDocument)finishList.get(i++);
+ QueuedDocument qd = finishList.get(i++);
qd.setProcessed();
}
@@ -956,8 +956,8 @@ public class WorkerThread extends Thread
*@param ingesterDeleteList is a list of document id's to delete.
*/
protected static void processDeleteLists(String outputName, IRepositoryConnector connector,
- IRepositoryConnection connection, IJobManager jobManager, ArrayList jobmanagerDeleteList,
- IIncrementalIngester ingester, ArrayList ingesterDeleteList, ArrayList ingesterDeleteListUnhashed,
+ IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> jobmanagerDeleteList,
+ IIncrementalIngester ingester, List<String> ingesterDeleteList, List<String> ingesterDeleteListUnhashed,
Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
int hopcountMethod, QueueTracker queueTracker, long currentTime)
throws ManifoldCFException
@@ -974,38 +974,60 @@ public class WorkerThread extends Thread
while (i < ingesterDeleteList.size())
{
deleteClasses[i] = connectionName;
- deleteIDs[i] = (String)ingesterDeleteList.get(i);
+ deleteIDs[i] = ingesterDeleteList.get(i);
i++;
}
- // Delete retry loop: Keep trying until we succeed (if ever).
- // This is reasonable behavior because a ServiceInterruption due to the ingestion API being down will block most progress anyhow;
- // it's reasonable therefore to just keep waiting until service is restored.
- while (true)
+
+ // Try to delete the documents via the output connection.
+ try
{
- try
+ ingester.documentDeleteMultiple(outputName,deleteClasses,deleteIDs,ingestLogger);
+ }
+ catch (ServiceInterruption e)
+ {
+ // It looks like the output connection is not currently functioning, so we need to requeue instead of deleting
+ // those documents that could not be removed.
+ List<QueuedDocument> newJobmanagerDeleteList = new ArrayList<QueuedDocument>();
+ List<QueuedDocument> newRequeueList = new ArrayList<QueuedDocument>();
+
+ Map<String,String> ingesterMap = new HashMap<String,String>();
+ for (int j = 0 ; j < ingesterDeleteList.size() ; j++)
{
- ingester.documentDeleteMultiple(outputName,deleteClasses,deleteIDs,ingestLogger);
- break;
+ String id = ingesterDeleteList.get(j);
+ ingesterMap.put(id,id);
}
- catch (ServiceInterruption e)
+ for (int j = 0 ; j < jobmanagerDeleteList.size() ; j++)
{
- long amt = e.getRetryTime();
- long now = System.currentTimeMillis();
- long waittime = amt-now;
- if (waittime <= 0L)
- waittime = 300000L;
- try
- {
- ManifoldCF.sleep(waittime);
- }
- catch (InterruptedException e2)
- {
- throw new ManifoldCFException("Interrupted: "+e2.getMessage(),e2,ManifoldCFException.INTERRUPTED);
- }
+ QueuedDocument qd = jobmanagerDeleteList.get(j);
+ DocumentDescription dd = qd.getDocumentDescription();
+ String documentIdentifier = dd.getDocumentIdentifier();
+ if (ingesterMap.get(documentIdentifier) != null)
+ newRequeueList.add(qd);
+ else
+ newJobmanagerDeleteList.add(qd);
}
+
+ // Requeue those that are supposed to be requeued
+ requeueDocuments(jobManager,newRequeueList,e.getRetryTime(),e.getFailTime(),
+ e.getFailRetryCount());
+
+ // Process the ones that are just new job queue changes
+ jobmanagerDeleteList = newJobmanagerDeleteList;
}
}
-
+
+ // Delete from the job queue
+ processJobQueueDeletions(jobmanagerDeleteList,connector,connection,
+ jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+ }
+
+ /** Process job queue deletions. Either the indexer has already been updated, or it is not necessary to update it.
+ */
+ protected static void processJobQueueDeletions(List<QueuedDocument> jobmanagerDeleteList,
+ IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager,
+ Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime)
+ throws ManifoldCFException
+ {
// Now, do the document queue cleanup for deletions.
if (jobmanagerDeleteList.size() > 0)
{
@@ -1013,7 +1035,7 @@ public class WorkerThread extends Thread
int i = 0;
while (i < deleteDescriptions.length)
{
- QueuedDocument qd = (QueuedDocument)jobmanagerDeleteList.get(i);
+ QueuedDocument qd = jobmanagerDeleteList.get(i);
deleteDescriptions[i] = qd.getDocumentDescription();
i++;
}
@@ -1028,7 +1050,7 @@ public class WorkerThread extends Thread
i = 0;
while (i < jobmanagerDeleteList.size())
{
- QueuedDocument qd = (QueuedDocument)jobmanagerDeleteList.get(i++);
+ QueuedDocument qd = jobmanagerDeleteList.get(i++);
qd.setProcessed();
}
}
@@ -1041,7 +1063,7 @@ public class WorkerThread extends Thread
*@param failTime is the time beyond which retries lead to hard failure.
*@param failCount is the number of retries allowed until hard failure.
*/
- protected static void requeueDocuments(IJobManager jobManager, ArrayList requeueList, long retryTime, long failTime, int failCount)
+ protected static void requeueDocuments(IJobManager jobManager, List<QueuedDocument> requeueList, long retryTime, long failTime, int failCount)
throws ManifoldCFException
{
if (requeueList.size() > 0)
@@ -1051,7 +1073,7 @@ public class WorkerThread extends Thread
int i = 0;
while (i < requeueDocs.length)
{
- QueuedDocument qd = (QueuedDocument)requeueList.get(i);
+ QueuedDocument qd = requeueList.get(i);
DocumentDescription dd = qd.getDocumentDescription();
requeueDocs[i] = dd;
i++;
@@ -1062,7 +1084,7 @@ public class WorkerThread extends Thread
i = 0;
while (i < requeueList.size())
{
- QueuedDocument qd = (QueuedDocument)requeueList.get(i++);
+ QueuedDocument qd = requeueList.get(i++);
qd.setProcessed();
}
}