You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/07/14 20:11:23 UTC

svn commit: r1610476 - in /manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler: connectors/BaseRepositoryConnector.java system/WorkerThread.java

Author: kwright
Date: Mon Jul 14 18:11:22 2014
New Revision: 1610476

URL: http://svn.apache.org/r1610476
Log:
Fix compilation errors

Modified:
    manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
    manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java?rev=1610476&r1=1610475&r2=1610476&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java Mon Jul 14 18:11:22 2014
@@ -364,7 +364,7 @@ public abstract class BaseRepositoryConn
       for (int i = 0; i < documentIdentifiers.length; i++)
       {
         String documentIdentifier = documentIdentifiers[i];
-        VersionContent vc = dv.getDocumentVersion(documentIdentifier);
+        VersionContext vc = dv.getDocumentVersion(documentIdentifier);
         if (vc != null)
         {
           if (dv.isAlwaysRefetch(documentIdentifier) || activities.checkDocumentNeedsReindexing(documentIdentifier,vc.getVersionString()))

Modified: manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1610476&r1=1610475&r2=1610476&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Mon Jul 14 18:11:22 2014
@@ -171,7 +171,6 @@ public class WorkerThread extends Thread
             }
 
             // Clear out all of our disposition lists
-            fetchList.clear();
             finishList.clear();
             deleteList.clear();
             ingesterCheckList.clear();
@@ -283,6 +282,59 @@ public class WorkerThread extends Thread
                   // Check for interruption before we start fetching
                   if (Thread.currentThread().isInterrupted())
                     throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+                  
+                  // We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process.
+                  // We put this in a map so it can be looked up by document identifier.
+                  // Create a full PipelineSpecification, including description strings.  (This is per-job still, but can throw ServiceInterruptions, so we do it in here.)
+                  IPipelineSpecification pipelineSpecification;
+                  try
+                  {
+                    pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester);
+                  }
+                  catch (ServiceInterruption e)
+                  {
+                    // Handle service interruption from pipeline
+                    if (!e.jobInactiveAbort())
+                      Logging.jobs.warn("Service interruption reported for job "+
+                      job.getID()+" connection '"+job.getConnectionName()+"': "+
+                      e.getMessage());
+
+                    if (!e.jobInactiveAbort() && e.isAbortOnFail())
+                      abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
+
+                    // All documents get requeued, because we never got far enough to make distinctions.  All we have to decide
+                    // is whether to requeue or abort.
+                    List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
+
+                    for (QueuedDocument qd : activeDocuments)
+                    {
+                      DocumentDescription dd = qd.getDocumentDescription();
+                      // Check for hard failure.  But no hard failure possible of it's a job inactive abort.
+                      if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
+                        dd.getFailRetryCount() == 0))
+                      {
+                        // Treat this as a hard failure.
+                        if (e.isAbortOnFail())
+                        {
+                          rescanList.add(qd);
+                        }
+                        else
+                        {
+                          requeueList.add(qd);
+                        }
+                      }
+                      else
+                      {
+                        requeueList.add(qd);
+                      }
+                    }
+                      
+                    requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
+                      e.getFailRetryCount());
+                      
+                    activeDocuments.clear();
+                    pipelineSpecification = null;
+                  }
 
                   if (activeDocuments.size() > 0)
                   {
@@ -297,8 +349,7 @@ public class WorkerThread extends Thread
 
                     // Build the processActivity object
                     
-                    // We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process.
-                    // We put this in a map so it can be looked up by document identifier.
+                    
                     Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications = new HashMap<String,IPipelineSpecificationWithVersions>();
                     String[] documentIDs = new String[activeDocuments.size()];
                     String[] documentIDHashes = new String[activeDocuments.size()];
@@ -331,7 +382,7 @@ public class WorkerThread extends Thread
                       // Now, process in bulk -- catching and handling ServiceInterruptions
                       try
                       {
-                        connector.processDocuments(documentIDs,existingVersions,activity,jobType,isDefaultAuthority);
+                        connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority);
 
                         for (QueuedDocument qd : activeDocuments)
                         {
@@ -394,12 +445,12 @@ public class WorkerThread extends Thread
                         // will abort the current job.
 
                         deleteList.clear();
-                        ArrayList requeueList = new ArrayList();
+                        List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
 
                         Set<String> fetchDocuments = new HashSet<String>();
                         for (QueuedDocument qd : activeDocuments)
                         {
-                          fetchDocuments.add(qd.getDocument().getDocumentDescription().getDocumentIdentifierHash());
+                          fetchDocuments.add(qd.getDocumentDescription().getDocumentIdentifierHash());
                         }
                         List<QueuedDocument> newFinishList = new ArrayList<QueuedDocument>();
                         for (int i = 0; i < finishList.size(); i++)