You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2015/05/31 13:17:05 UTC
svn commit: r1682719 -
/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Author: kwright
Date: Sun May 31 11:17:04 2015
New Revision: 1682719
URL: http://svn.apache.org/r1682719
Log:
Enhance debugging for worker threads
Modified:
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1682719&r1=1682718&r2=1682719&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Sun May 31 11:17:04 2015
@@ -139,7 +139,14 @@ public class WorkerThread extends Thread
continue;
if (Logging.threads.isDebugEnabled())
- Logging.threads.debug("Worker thread received "+Integer.toString(qds.getCount())+" documents");
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int z = 0; z < qds.getCount(); z++)
+ {
+ sb.append(qds.getDocument(z).getDocumentDescription().getID()).append(" ");
+ }
+ Logging.threads.debug("Worker thread processing documents: "+sb);
+ }
// Build a basic pipeline specification right off; we need it whenever
// we interact with Incremental Ingester.
@@ -199,13 +206,15 @@ public class WorkerThread extends Thread
if (legalLinkTypes == null)
{
// Failure here puts all remaining documents into rescan list
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Moving "+makeListString(activeDocuments)+" to rescanList");
moveList(activeDocuments,rescanList);
}
}
if (Logging.threads.isDebugEnabled())
- Logging.threads.debug(" Post-linktype document count is "+Integer.toString(activeDocuments.size()));
-
+ Logging.threads.debug("Post-relationship document count is "+Integer.toString(activeDocuments.size()));
+
// Do the hopcount checks, if any. This will iteratively reduce the viable list of
// document identifiers in need of having their versions fetched.
if (legalLinkTypes != null && activeDocuments.size() > 0)
@@ -242,6 +251,8 @@ public class WorkerThread extends Thread
{
if (overallResults[i] == false)
{
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Adding "+activeDocuments.get(i).getDocumentDescription().getID()+" to hopcountremovelist");
hopcountremoveList.add(activeDocuments.get(i));
}
else
@@ -270,7 +281,11 @@ public class WorkerThread extends Thread
if (connector == null)
{
// Failure here puts all remaining documents into rescan list
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Moving "+makeListString(activeDocuments)+" to rescanList");
moveList(activeDocuments,rescanList);
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Moving "+makeListString(hopcountremoveList)+" to rescanList");
moveList(hopcountremoveList,rescanList);
}
}
@@ -319,18 +334,23 @@ public class WorkerThread extends Thread
}
else
{
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList because of output service interruption");
requeueList.add(qd);
}
}
else
{
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList because of output service interruption");
requeueList.add(qd);
}
}
requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
e.getFailRetryCount());
-
+
+ Logging.threads.debug(" Clearing active documents list due to output service interruption");
activeDocuments.clear();
pipelineSpecification = null;
}
@@ -370,7 +390,7 @@ public class WorkerThread extends Thread
try
{
if (Logging.threads.isDebugEnabled())
- Logging.threads.debug("Worker thread about to process "+Integer.toString(documentIDs.length)+" documents");
+ Logging.threads.debug("Worker thread about to process "+makeListString(activeDocuments));
// Now, process in bulk -- catching and handling ServiceInterruptions
ServiceInterruption serviceInterruption = null;
@@ -442,10 +462,14 @@ public class WorkerThread extends Thread
// We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
// Add to the finish list, so it gets requeued. Because the document is already marked as aborted, this should be enough to cause an
// unconditional requeue.
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to finishList");
finishList.add(qd);
}
else if (activity.wasDocumentDeleted(qd.getDocumentDescription().getDocumentIdentifier()))
{
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to deleteList");
deleteList.add(qd);
}
else if (serviceInterruption != null)
@@ -463,6 +487,8 @@ public class WorkerThread extends Thread
{
// Make sure that the job aborts.
abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((serviceInterruption.getCause()!=null)?": "+serviceInterruption.getCause().getMessage():""),serviceInterruption.getCause());
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to rescanList due to service interruption");
rescanList.add(qd);
}
else
@@ -471,21 +497,31 @@ public class WorkerThread extends Thread
// We want this particular document to be not included in the
// reprocessing. Therefore, we do the same thing as we would
// if we got back a null version.
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to deleteList due to service interruption");
deleteList.add(qd);
}
}
else
{
// Not a hard failure. Requeue.
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList");
requeueList.add(qd);
}
}
else
+ {
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to finishList");
finishList.add(qd);
+ }
// Note whether the document was untouched; if so, update it
if (!activity.wasDocumentTouched(qd.getDocumentDescription().getDocumentIdentifier()))
{
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to ingesterCheckList");
ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
}
}
@@ -494,6 +530,8 @@ public class WorkerThread extends Thread
if (serviceInterruption != null)
{
// Requeue the documents we've identified as needing to be repeated
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug("Requeuing documents "+makeListString(requeueList));
requeueDocuments(jobManager,requeueList,serviceInterruption.getRetryTime(),serviceInterruption.getFailTime(),
serviceInterruption.getFailRetryCount());
}
@@ -517,6 +555,9 @@ public class WorkerThread extends Thread
// Process the finish list!
if (finishList.size() > 0)
{
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug("Finishing documents "+makeListString(finishList));
+
// "Finish" the documents (removing unneeded carrydown info, and compute hopcounts).
// This can ONLY be done on fully-completed documents; everything else should be left in a dangling
// state (which we know is OK because it will be fixed the next time the document is attempted).
@@ -527,6 +568,8 @@ public class WorkerThread extends Thread
documentIDHashes[k++] = qd.getDocumentDescription().getDocumentIdentifierHash();
}
DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,documentIDHashes,job.getHopcountMode());
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Requeueing documents due to carrydown "+makeListString(requeueCandidates));
ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,rt,currentTime);
// In both job types, we have to go through the finishList to figure out what to do with the documents.
@@ -609,6 +652,8 @@ public class WorkerThread extends Thread
}
}
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Requeuing "+makeListString(recrawlDocs));
jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);
}
@@ -646,6 +691,8 @@ public class WorkerThread extends Thread
actionArray[i] = IJobManager.ACTION_RESCAN;
}
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Requeuing "+makeListString(docDescriptions));
jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
}
@@ -658,6 +705,8 @@ public class WorkerThread extends Thread
docDescriptions[i] = completedList.get(i);
}
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Marking completed "+makeListString(docDescriptions));
jobManager.markDocumentCompletedMultiple(docDescriptions);
}
}
@@ -687,11 +736,15 @@ public class WorkerThread extends Thread
}
// Now, handle the delete list
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug("Deleting "+makeListString(deleteList));
processDeleteLists(pipelineConnections,connector,connection,jobManager,
deleteList,ingester,
job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
// Handle hopcount removal
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug("Hopcount removal "+makeListString(hopcountremoveList));
processHopcountRemovalLists(pipelineConnections,connector,connection,jobManager,
hopcountremoveList,ingester,
job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
@@ -705,6 +758,8 @@ public class WorkerThread extends Thread
}
// Handle rescanning
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug("Rescanning documents "+makeListString(rescanList));
for (QueuedDocument qd : rescanList)
{
jobManager.resetDocument(qd.getDocumentDescription(),0L,IJobManager.ACTION_RESCAN,-1L,-1);
@@ -825,6 +880,28 @@ public class WorkerThread extends Thread
return true;
}
+ protected static String makeListString(List<QueuedDocument> sourceList)
+ {
+ StringBuilder sb = new StringBuilder("{");
+ for (QueuedDocument qd : sourceList)
+ {
+ sb.append(qd.getDocumentDescription().getID()).append(" ");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
+ protected static String makeListString(DocumentDescription[] sourceList)
+ {
+ StringBuilder sb = new StringBuilder("{");
+ for (DocumentDescription dd : sourceList)
+ {
+ sb.append(dd.getID()).append(" ");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
protected static void moveList(List<QueuedDocument> sourceList, List<QueuedDocument> targetList)
{
for (int i = 0; i < sourceList.size(); i++)