You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/07/14 20:11:23 UTC
svn commit: r1610476 - in
/manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler:
connectors/BaseRepositoryConnector.java system/WorkerThread.java
Author: kwright
Date: Mon Jul 14 18:11:22 2014
New Revision: 1610476
URL: http://svn.apache.org/r1610476
Log:
Fix compilation errors
Modified:
manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Modified: manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java?rev=1610476&r1=1610475&r2=1610476&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/connectors/BaseRepositoryConnector.java Mon Jul 14 18:11:22 2014
@@ -364,7 +364,7 @@ public abstract class BaseRepositoryConn
for (int i = 0; i < documentIdentifiers.length; i++)
{
String documentIdentifier = documentIdentifiers[i];
- VersionContent vc = dv.getDocumentVersion(documentIdentifier);
+ VersionContext vc = dv.getDocumentVersion(documentIdentifier);
if (vc != null)
{
if (dv.isAlwaysRefetch(documentIdentifier) || activities.checkDocumentNeedsReindexing(documentIdentifier,vc.getVersionString()))
Modified: manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1610476&r1=1610475&r2=1610476&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-990/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Mon Jul 14 18:11:22 2014
@@ -171,7 +171,6 @@ public class WorkerThread extends Thread
}
// Clear out all of our disposition lists
- fetchList.clear();
finishList.clear();
deleteList.clear();
ingesterCheckList.clear();
@@ -283,6 +282,59 @@ public class WorkerThread extends Thread
// Check for interruption before we start fetching
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+
+ // We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process.
+ // We put this in a map so it can be looked up by document identifier.
+ // Create a full PipelineSpecification, including description strings. (This is per-job still, but can throw ServiceInterruptions, so we do it in here.)
+ IPipelineSpecification pipelineSpecification;
+ try
+ {
+ pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester);
+ }
+ catch (ServiceInterruption e)
+ {
+ // Handle service interruption from pipeline
+ if (!e.jobInactiveAbort())
+ Logging.jobs.warn("Service interruption reported for job "+
+ job.getID()+" connection '"+job.getConnectionName()+"': "+
+ e.getMessage());
+
+ if (!e.jobInactiveAbort() && e.isAbortOnFail())
+ abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
+
+ // All documents get requeued, because we never got far enough to make distinctions. All we have to decide
+ // is whether to requeue or abort.
+ List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
+
+ for (QueuedDocument qd : activeDocuments)
+ {
+ DocumentDescription dd = qd.getDocumentDescription();
+ // Check for hard failure. But no hard failure possible of it's a job inactive abort.
+ if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
+ dd.getFailRetryCount() == 0))
+ {
+ // Treat this as a hard failure.
+ if (e.isAbortOnFail())
+ {
+ rescanList.add(qd);
+ }
+ else
+ {
+ requeueList.add(qd);
+ }
+ }
+ else
+ {
+ requeueList.add(qd);
+ }
+ }
+
+ requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
+ e.getFailRetryCount());
+
+ activeDocuments.clear();
+ pipelineSpecification = null;
+ }
if (activeDocuments.size() > 0)
{
@@ -297,8 +349,7 @@ public class WorkerThread extends Thread
// Build the processActivity object
- // We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process.
- // We put this in a map so it can be looked up by document identifier.
+
Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications = new HashMap<String,IPipelineSpecificationWithVersions>();
String[] documentIDs = new String[activeDocuments.size()];
String[] documentIDHashes = new String[activeDocuments.size()];
@@ -331,7 +382,7 @@ public class WorkerThread extends Thread
// Now, process in bulk -- catching and handling ServiceInterruptions
try
{
- connector.processDocuments(documentIDs,existingVersions,activity,jobType,isDefaultAuthority);
+ connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority);
for (QueuedDocument qd : activeDocuments)
{
@@ -394,12 +445,12 @@ public class WorkerThread extends Thread
// will abort the current job.
deleteList.clear();
- ArrayList requeueList = new ArrayList();
+ List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
Set<String> fetchDocuments = new HashSet<String>();
for (QueuedDocument qd : activeDocuments)
{
- fetchDocuments.add(qd.getDocument().getDocumentDescription().getDocumentIdentifierHash());
+ fetchDocuments.add(qd.getDocumentDescription().getDocumentIdentifierHash());
}
List<QueuedDocument> newFinishList = new ArrayList<QueuedDocument>();
for (int i = 0; i < finishList.size(); i++)