You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2015/05/31 13:17:58 UTC

svn commit: r1682720 - in /manifoldcf/branches/dev_1x: ./ framework/ framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Author: kwright
Date: Sun May 31 11:17:58 2015
New Revision: 1682720

URL: http://svn.apache.org/r1682720
Log:
Enhance worker thread logging

Modified:
    manifoldcf/branches/dev_1x/   (props changed)
    manifoldcf/branches/dev_1x/framework/   (props changed)
    manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Propchange: manifoldcf/branches/dev_1x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun May 31 11:17:58 2015
@@ -124,4 +124,4 @@
 /manifoldcf/branches/CONNECTORS-981:1605049-1605773
 /manifoldcf/branches/CONNECTORS-989:1611600-1612101
 /manifoldcf/branches/CONNECTORS-990:1610284-1610707
-/manifoldcf/trunk

 1660258,1660276,1661454,1665848,1666160,1666781,1666820,1668312,1669100,1669238,1669487,1669523,1669586,1669660,1670614,1670625,1670715,1671496,1672169,1672301,1672616,1672737,1673559,1673573,1673579,1673722,1675781,1675898,1676094,1676882,1676910,1678300,1678329,1678471,1678551,1679730,1679826,1681390,1681735,1682232,1682252,1682410
+/manifoldcf/trunk
 4155,1634188,1634202,1634264,1634373,1634530,1634688,1634850,1634857,1635103,1635116,1635421,1635438,1635478,1635481,1635484,1635490,1635809,1635939,1636146,1636167,1636180,1636207,1636215,1636232,1636334,1636519,1636570,1636684,1636940,1637011,1637310,1637350,1637364,1637373,1637378,1639259,1639593,1639600,1640018,1640101,1640199,1640204,1640208,1640314,1640319,1640749,1640772,1640805,1640888,1640925,1640941-1640942,1641222,1641328,1641557,1641559,1641629,1641633,1641724,1641754,1641911,1642163,1642255,1642318,1642531,1642650,1642658,1642673,1642716,1644197,1644399,1644538,1644920,1644931,1646317,1646397,1646403,1646408,1646640,1646947,1647574,1647585,1647608,1648686,1648976,1649201,1649203,1649529,1649605,1649628,1649794,1650351,1650722,1650741-1650742,1650745,1650747,1650911,1650954,1651332,1651539,1651907,1651921,1652071,1652974,1653175,1653899,1654651,1655205,1655261,1655264,1655377,1655411,1655618,1655914,1657346,1657443,1658004,1658036,1658121,1658155,1658188,1658463,1658476,
 1660258,1660276,1661454,1665848,1666160,1666781,1666820,1668312,1669100,1669238,1669487,1669523,1669586,1669660,1670614,1670625,1670715,1671496,1672169,1672301,1672616,1672737,1673559,1673573,1673579,1673722,1675781,1675898,1676094,1676882,1676910,1678300,1678329,1678471,1678551,1679730,1679826,1681390,1681735,1682232,1682252,1682410,1682719

Propchange: manifoldcf/branches/dev_1x/framework/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun May 31 11:17:58 2015
@@ -113,4 +113,4 @@
 /manifoldcf/branches/CONNECTORS-989/framework:1611600-1612101
 /manifoldcf/branches/CONNECTORS-990/framework:1610284-1610707
 /manifoldcf/trunk:1629122
-/manifoldcf/trunk/framework
 641724,1641911,1642163,1642255,1642318,1644197,1644399,1646317,1646397,1646403,1646640,1647574,1647585,1647608,1649605,1650351,1650911,1651332,1651539,1651921,1655377,1655411,1657346,1658004,1658036,1660258,1660276,1669487,1670614,1672616,1672737,1676094,1681390,1681735,1682232,1682252,1682410
+/manifoldcf/trunk/framework
 641724,1641911,1642163,1642255,1642318,1644197,1644399,1646317,1646397,1646403,1646640,1647574,1647585,1647608,1649605,1650351,1650911,1651332,1651539,1651921,1655377,1655411,1657346,1658004,1658036,1660258,1660276,1669487,1670614,1672616,1672737,1676094,1681390,1681735,1682232,1682252,1682410,1682719

Modified: manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1682720&r1=1682719&r2=1682720&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Sun May 31 11:17:58 2015
@@ -139,7 +139,14 @@ public class WorkerThread extends Thread
               continue;
 
             if (Logging.threads.isDebugEnabled())
-              Logging.threads.debug("Worker thread received "+Integer.toString(qds.getCount())+" documents");
+            {
+              StringBuilder sb = new StringBuilder();
+              for (int z = 0; z < qds.getCount(); z++)
+              {
+                sb.append(qds.getDocument(z).getDocumentDescription().getID()).append(" ");
+              }
+              Logging.threads.debug("Worker thread processing documents: "+sb);
+            }
 
             // Build a basic pipeline specification right off; we need it whenever
             // we interact with Incremental Ingester.
@@ -201,13 +208,15 @@ public class WorkerThread extends Thread
                 if (legalLinkTypes == null)
                 {
                   // Failure here puts all remaining documents into rescan list
+                  if (Logging.threads.isDebugEnabled())
+                    Logging.threads.debug(" Moving "+makeListString(activeDocuments)+" to rescanList");
                   moveList(activeDocuments,rescanList);
                 }
               }
 
               if (Logging.threads.isDebugEnabled())
-                Logging.threads.debug(" Post-linktype document count is "+Integer.toString(activeDocuments.size()));
-              
+                Logging.threads.debug("Post-relationship document count is "+Integer.toString(activeDocuments.size()));
+
               // Do the hopcount checks, if any.  This will iteratively reduce the viable list of
               // document identifiers in need of having their versions fetched.
               if (legalLinkTypes != null && activeDocuments.size() > 0)
@@ -244,6 +253,8 @@ public class WorkerThread extends Thread
                 {
                   if (overallResults[i] == false)
                   {
+                    if (Logging.threads.isDebugEnabled())
+                      Logging.threads.debug(" Adding "+activeDocuments.get(i).getDocumentDescription().getID()+" to hopcountremovelist");
                     hopcountremoveList.add(activeDocuments.get(i));
                   }
                   else
@@ -272,7 +283,11 @@ public class WorkerThread extends Thread
                 if (connector == null)
                 {
                   // Failure here puts all remaining documents into rescan list
+                  if (Logging.threads.isDebugEnabled())
+                    Logging.threads.debug(" Moving "+makeListString(activeDocuments)+" to rescanList");
                   moveList(activeDocuments,rescanList);
+                  if (Logging.threads.isDebugEnabled())
+                    Logging.threads.debug(" Moving "+makeListString(hopcountremoveList)+" to rescanList");
                   moveList(hopcountremoveList,rescanList);
                 }
               }
@@ -321,18 +336,23 @@ public class WorkerThread extends Thread
                         }
                         else
                         {
+                          if (Logging.threads.isDebugEnabled())
+                            Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList because of output service interruption");
                           requeueList.add(qd);
                         }
                       }
                       else
                       {
+                        if (Logging.threads.isDebugEnabled())
+                          Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList because of output service interruption");
                         requeueList.add(qd);
                       }
                     }
                       
                     requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
                       e.getFailRetryCount());
-                      
+                    
+                    Logging.threads.debug(" Clearing active documents list due to output service interruption");
                     activeDocuments.clear();
                     pipelineSpecification = null;
                   }
@@ -374,7 +394,7 @@ public class WorkerThread extends Thread
                     try
                     {
                       if (Logging.threads.isDebugEnabled())
-                        Logging.threads.debug("Worker thread about to process "+Integer.toString(documentIDs.length)+" documents");
+                        Logging.threads.debug("Worker thread about to process "+makeListString(activeDocuments));
 
                       // Now, process in bulk -- catching and handling ServiceInterruptions
                       ServiceInterruption serviceInterruption = null;
@@ -446,10 +466,14 @@ public class WorkerThread extends Thread
                           // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
                           // Add to the finish list, so it gets requeued.  Because the document is already marked as aborted, this should be enough to cause an
                           // unconditional requeue.
+                          if (Logging.threads.isDebugEnabled())
+                            Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to finishList");
                           finishList.add(qd);
                         }
                         else if (activity.wasDocumentDeleted(qd.getDocumentDescription().getDocumentIdentifier()))
                         {
+                          if (Logging.threads.isDebugEnabled())
+                            Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to deleteList");
                           deleteList.add(qd);
                         }
                         else if (serviceInterruption != null)
@@ -467,6 +491,8 @@ public class WorkerThread extends Thread
                             {
                               // Make sure that the job aborts.
                               abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((serviceInterruption.getCause()!=null)?": "+serviceInterruption.getCause().getMessage():""),serviceInterruption.getCause());
+                              if (Logging.threads.isDebugEnabled())
+                                Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to rescanList due to service interruption");
                               rescanList.add(qd);
                             }
                             else
@@ -475,21 +501,31 @@ public class WorkerThread extends Thread
                               // We want this particular document to be not included in the
                               // reprocessing.  Therefore, we do the same thing as we would
                               // if we got back a null version.
+                              if (Logging.threads.isDebugEnabled())
+                                Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to deleteList due to service interruption");
                               deleteList.add(qd);
                             }
                           }
                           else
                           {
                             // Not a hard failure.  Requeue.
+                            if (Logging.threads.isDebugEnabled())
+                              Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to requeueList");
                             requeueList.add(qd);
                           }
                         }
                         else
+                        {
+                          if (Logging.threads.isDebugEnabled())
+                            Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to finishList");
                           finishList.add(qd);
+                        }
                         
                         // Note whether the document was untouched; if so, update it
                         if (!activity.wasDocumentTouched(qd.getDocumentDescription().getDocumentIdentifier()))
                         {
+                          if (Logging.threads.isDebugEnabled())
+                            Logging.threads.debug(" Adding "+qd.getDocumentDescription().getID()+" to ingesterCheckList");
                           ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
                         }
                       }
@@ -498,6 +534,8 @@ public class WorkerThread extends Thread
                       if (serviceInterruption != null)
                       {
                         // Requeue the documents we've identified as needing to be repeated
+                        if (Logging.threads.isDebugEnabled())
+                          Logging.threads.debug("Requeuing documents "+makeListString(requeueList));
                         requeueDocuments(jobManager,requeueList,serviceInterruption.getRetryTime(),serviceInterruption.getFailTime(),
                           serviceInterruption.getFailRetryCount());
                       }
@@ -521,6 +559,9 @@ public class WorkerThread extends Thread
                       // Process the finish list!
                       if (finishList.size() > 0)
                       {
+                        if (Logging.threads.isDebugEnabled())
+                          Logging.threads.debug("Finishing documents "+makeListString(finishList));
+
                         // "Finish" the documents (removing unneeded carrydown info, and compute hopcounts).
                         // This can ONLY be done on fully-completed documents; everything else should be left in a dangling
                         // state (which we know is OK because it will be fixed the next time the document is attempted).
@@ -531,6 +572,8 @@ public class WorkerThread extends Thread
                           documentIDHashes[k++] = qd.getDocumentDescription().getDocumentIdentifierHash();
                         }
                         DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,documentIDHashes,job.getHopcountMode());
+                        if (Logging.threads.isDebugEnabled())
+                          Logging.threads.debug(" Requeueing documents due to carrydown "+makeListString(requeueCandidates));
                         ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,rt,currentTime);
                         
                         // In both job types, we have to go through the finishList to figure out what to do with the documents.
@@ -613,6 +656,8 @@ public class WorkerThread extends Thread
                               }
                             }
 
+                            if (Logging.threads.isDebugEnabled())
+                              Logging.threads.debug(" Requeuing "+makeListString(recrawlDocs));
                             jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);
 
                           }
@@ -650,6 +695,8 @@ public class WorkerThread extends Thread
                                 actionArray[i] = IJobManager.ACTION_RESCAN;
                               }
 
+                              if (Logging.threads.isDebugEnabled())
+                                Logging.threads.debug(" Requeuing "+makeListString(docDescriptions));
                               jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
                             }
 
@@ -662,6 +709,8 @@ public class WorkerThread extends Thread
                                 docDescriptions[i] = completedList.get(i);
                               }
 
+                              if (Logging.threads.isDebugEnabled())
+                                Logging.threads.debug(" Marking completed "+makeListString(docDescriptions));
                               jobManager.markDocumentCompletedMultiple(docDescriptions);
                             }
                           }
@@ -691,11 +740,15 @@ public class WorkerThread extends Thread
                   }
                   
                   // Now, handle the delete list
+                  if (Logging.threads.isDebugEnabled())
+                    Logging.threads.debug("Deleting "+makeListString(deleteList));
                   processDeleteLists(pipelineConnections,connector,connection,jobManager,
                     deleteList,ingester,
                     job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
                   // Handle hopcount removal
+                  if (Logging.threads.isDebugEnabled())
+                    Logging.threads.debug("Hopcount removal "+makeListString(hopcountremoveList));
                   processHopcountRemovalLists(pipelineConnections,connector,connection,jobManager,
                     hopcountremoveList,ingester,
                     job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
@@ -709,6 +762,8 @@ public class WorkerThread extends Thread
               }
               
               // Handle rescanning
+              if (Logging.threads.isDebugEnabled())
+                Logging.threads.debug("Rescanning documents "+makeListString(rescanList));
               for (QueuedDocument qd : rescanList)
               {
                 jobManager.resetDocument(qd.getDocumentDescription(),0L,IJobManager.ACTION_RESCAN,-1L,-1);
@@ -829,6 +884,28 @@ public class WorkerThread extends Thread
     return true;
   }
 
+  protected static String makeListString(List<QueuedDocument> sourceList)
+  {
+    StringBuilder sb = new StringBuilder("{");
+    for (QueuedDocument qd : sourceList)
+    {
+      sb.append(qd.getDocumentDescription().getID()).append(" ");
+    }
+    sb.append("}");
+    return sb.toString();
+  }
+
+  protected static String makeListString(DocumentDescription[] sourceList)
+  {
+    StringBuilder sb = new StringBuilder("{");
+    for (DocumentDescription dd : sourceList)
+    {
+      sb.append(dd.getID()).append(" ");
+    }
+    sb.append("}");
+    return sb.toString();
+  }
+  
   protected static void moveList(List<QueuedDocument> sourceList, List<QueuedDocument> targetList)
   {
     for (int i = 0; i < sourceList.size(); i++)