You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/07/15 16:25:40 UTC
svn commit: r1610713 [2/2] - in /manifoldcf/trunk: ./
connectors/documentum/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/DCTM/
connectors/filenet/connector/src/main/java/org/apache/manifoldcf/crawler/connectors/filenet/
connectors/h...
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1610713&r1=1610712&r2=1610713&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Tue Jul 15 14:25:39 2014
@@ -5188,6 +5188,85 @@ public class JobManager implements IJobM
new Object[][][]{dataValues},currentTime,new IPriorityCalculator[]{priority},new String[][]{prereqEventNames});
}
+ /** Undo the addition of child documents to the queue, for a set of documents.
+ * This method is called at the end of document processing, to back out any incomplete additions to the queue, and restore
+ * the status quo ante prior to the incomplete additions. Call this method instead of finishDocuments() if the
+ * addition of documents was not completed.
+ *@param jobID is the job identifier.
+ *@param legalLinkTypes is the set of legal link types that this connector generates.
+ *@param parentIdentifierHashes are the hashes of the document identifiers for whom child link extraction just took place.
+ */
+ @Override
+ public void revertDocuments(Long jobID, String[] legalLinkTypes,
+ String[] parentIdentifierHashes)
+ throws ManifoldCFException
+ {
+ if (parentIdentifierHashes.length == 0)
+ return;
+
+ if (legalLinkTypes.length == 0)
+ {
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction(database.TRANSACTION_SERIALIZED);
+ try
+ {
+ // Revert carrydown records
+ carryDown.revertRecords(jobID,parentIdentifierHashes);
+ database.performCommit();
+ break;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ catch (RuntimeException e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+ }
+ else
+ {
+ // Revert both hopcount and carrydown
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction(database.TRANSACTION_SERIALIZED);
+ try
+ {
+ carryDown.revertRecords(jobID,parentIdentifierHashes);
+ hopCount.revertParents(jobID,parentIdentifierHashes);
+ database.performCommit();
+ break;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ catch (RuntimeException e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+ }
+ }
+
/** Complete adding child documents to the queue, for a set of documents.
* This method is called at the end of document processing, to help the hopcount tracking engine do its bookkeeping.
*@param jobID is the job identifier.
@@ -5240,6 +5319,11 @@ public class JobManager implements IJobM
database.signalRollback();
throw e;
}
+ catch (RuntimeException e)
+ {
+ database.signalRollback();
+ throw e;
+ }
finally
{
database.endTransaction();
@@ -5299,6 +5383,11 @@ public class JobManager implements IJobM
database.signalRollback();
throw e;
}
+ catch (RuntimeException e)
+ {
+ database.signalRollback();
+ throw e;
+ }
finally
{
database.endTransaction();
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1610713&r1=1610712&r2=1610713&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Tue Jul 15 14:25:39 2014
@@ -80,9 +80,8 @@ public class WorkerThread extends Thread
IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
- List<DocumentToProcess> fetchList = new ArrayList<DocumentToProcess>();
+ // This is the set of documents that we will either be marking as complete, or requeued, depending on the kind of crawl.
List<QueuedDocument> finishList = new ArrayList<QueuedDocument>();
- Map<String,Integer> idHashIndexMap = new HashMap<String,Integer>();
// This is where we accumulate the document QueuedDocuments to be deleted from the job queue.
List<QueuedDocument> deleteList = new ArrayList<QueuedDocument>();
@@ -172,7 +171,6 @@ public class WorkerThread extends Thread
}
// Clear out all of our disposition lists
- fetchList.clear();
finishList.clear();
deleteList.clear();
ingesterCheckList.clear();
@@ -284,503 +282,375 @@ public class WorkerThread extends Thread
// Check for interruption before we start fetching
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
-
- if (activeDocuments.size() > 0)
+
+ // We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process.
+ // We put this in a map so it can be looked up by document identifier.
+ // Create a full PipelineSpecification, including description strings. (This is per-job still, but can throw ServiceInterruptions, so we do it in here.)
+ IPipelineSpecification pipelineSpecification;
+ try
{
- // === Fetch document versions ===
- String[] currentDocIDHashArray = new String[activeDocuments.size()];
- String[] currentDocIDArray = new String[activeDocuments.size()];
- // We used to feed the old document version back to the repository connector so that it could
- // make decisions about whether to fetch, or just to call documentRecord(). The problem in a
- // multi-output world is that we may have had an error, and successfully output a document to
- // some outputs but not to others. But we do this in a specific order. It should be always safe
- // to get the document version from the *last* output in the sequence. The problem is, we need
- // to be able to figure out what that is, and it is currently an implementation detail of
- // IncrementalIngester. We solve this by allowing IncrementalIngester to make the decision.
-
- String[] oldVersionStringArray = new String[activeDocuments.size()];
+ pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester);
+ }
+ catch (ServiceInterruption e)
+ {
+ // Handle service interruption from pipeline
+ if (!e.jobInactiveAbort())
+ Logging.jobs.warn("Service interruption reported for job "+
+ job.getID()+" connection '"+job.getConnectionName()+"': "+
+ e.getMessage());
+
+ // All documents get requeued, because we never got far enough to make distinctions. All we have to decide
+ // is whether to requeue or abort.
+ List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
- for (int i = 0; i < activeDocuments.size(); i++)
+ for (QueuedDocument qd : activeDocuments)
{
- QueuedDocument qd = activeDocuments.get(i);
- currentDocIDHashArray[i] = qd.getDocumentDescription().getDocumentIdentifierHash();
- currentDocIDArray[i] = qd.getDocumentDescription().getDocumentIdentifier();
- DocumentIngestStatus dis = qd.getLastIngestedStatus(lastIndexedOutputConnectionName);
- if (dis == null)
- oldVersionStringArray[i] = null;
+ DocumentDescription dd = qd.getDocumentDescription();
+ // Check for hard failure. But no hard failure possible of it's a job inactive abort.
+ if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
+ dd.getFailRetryCount() == 0))
+ {
+ // Treat this as a hard failure.
+ if (e.isAbortOnFail())
+ {
+ rescanList.add(qd);
+ abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
+ }
+ else
+ {
+ requeueList.add(qd);
+ }
+ }
else
{
- oldVersionStringArray[i] = dis.getDocumentVersion();
- if (oldVersionStringArray[i] == null)
- oldVersionStringArray[i] = "";
+ requeueList.add(qd);
}
}
+
+ requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
+ e.getFailRetryCount());
+
+ activeDocuments.clear();
+ pipelineSpecification = null;
+ }
- // Create a full PipelineSpecification, including description strings. (This is per-job still, but can throw ServiceInterruptions, so we do it in here.)
- IPipelineSpecification pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester);
+ if (activeDocuments.size() > 0)
+ {
- Set<String> abortSet = new HashSet<String>();
- VersionActivity versionActivity = new VersionActivity(job.getID(),processID,connectionName,pipelineSpecification,connMgr,jobManager,ingester,abortSet,ingestLogger);
-
+ // **** New worker thread code starts here!!! ****
+
+ IExistingVersions existingVersions = new ExistingVersions(lastIndexedOutputConnectionName,activeDocuments);
String aclAuthority = connection.getACLAuthority();
if (aclAuthority == null)
aclAuthority = "";
boolean isDefaultAuthority = (aclAuthority.length() == 0);
- if (Logging.threads.isDebugEnabled())
- Logging.threads.debug("Worker thread getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
-
- // === Fetch documents ===
- // We start by getting the document version string.
- DocumentVersions documentVersions = new DocumentVersions();
- boolean successfulVersions = false;
- try
+ // Build the processActivity object
+
+
+ Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications = new HashMap<String,IPipelineSpecificationWithVersions>();
+ String[] documentIDs = new String[activeDocuments.size()];
+ int k = 0;
+ for (QueuedDocument qd : activeDocuments)
{
- connector.getDocumentVersions(documentVersions,currentDocIDArray,oldVersionStringArray,
- versionActivity,spec,jobType,isDefaultAuthority);
- successfulVersions = true;
-
- if (Logging.threads.isDebugEnabled())
- Logging.threads.debug("Worker thread done getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
-
+ fetchPipelineSpecifications.put(qd.getDocumentDescription().getDocumentIdentifierHash(),
+ new PipelineSpecificationWithVersions(pipelineSpecification,qd));
+ documentIDs[k++] = qd.getDocumentDescription().getDocumentIdentifier();
}
- catch (ServiceInterruption e)
+
+ ProcessActivity activity = new ProcessActivity(job.getID(),processID,
+ threadContext,rt,jobManager,ingester,
+ connectionName,pipelineSpecification,
+ fetchPipelineSpecifications,
+ currentTime,
+ job.getExpiration(),
+ job.getForcedMetadata(),
+ job.getInterval(),
+ job.getMaxInterval(),
+ job.getHopcountMode(),
+ connection,connector,connMgr,legalLinkTypes,ingestLogger,
+ newParameterVersion);
+ try
{
- // This service interruption comes from a point where we
- // know that no documents were ingested.
- // Therefore, active -> pending and activepurgatory -> pendingpurgatory
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug("Worker thread about to process "+Integer.toString(documentIDs.length)+" documents");
- if (!e.jobInactiveAbort())
+ // Now, process in bulk -- catching and handling ServiceInterruptions
+ ServiceInterruption serviceInterruption = null;
+ try
{
- Logging.jobs.warn("Pre-ingest service interruption reported for job "+
+ connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority);
+ }
+ catch (ServiceInterruption e)
+ {
+ serviceInterruption = e;
+ if (!e.jobInactiveAbort())
+ Logging.jobs.warn("Service interruption reported for job "+
job.getID()+" connection '"+job.getConnectionName()+"': "+
e.getMessage());
}
- if (!e.jobInactiveAbort() && e.isAbortOnFail())
- abortOnFail = new ManifoldCFException("Repeated service interruptions - failure getting document version"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
-
- // Mark the current documents to be recrawled at the
- // time specified, with the proper error handling.
- List<QueuedDocument> newActiveList = new ArrayList<QueuedDocument>(activeDocuments.size());
- for (int i = 0; i < activeDocuments.size(); i++)
+ // Flush remaining references into the database!
+ activity.flush();
+
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug("Worker thread done processing "+Integer.toString(documentIDs.length)+" documents");
+
+ // Either way, handle the documents we were supposed to process. But if there was a service interruption,
+ // and the disposition of the document was unclear, then the document will need to be requeued instead of handled normally.
+ List<QueuedDocument> requeueList = new ArrayList<QueuedDocument>();
+
+ for (QueuedDocument qd : activeDocuments)
{
- QueuedDocument qd = activeDocuments.get(i);
- DocumentDescription dd = qd.getDocumentDescription();
- // If either we are going to be requeuing beyond the fail time, OR
- // the number of retries available has hit 0, THEN we treat this
- // as either an "ignore" or a hard error.
- if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
- dd.getFailRetryCount() == 0))
+ // If this document was aborted, then treat it specially.
+ if (activity.wasDocumentAborted(qd.getDocumentDescription().getDocumentIdentifier()))
{
- // Treat this as a hard failure.
- if (e.isAbortOnFail())
- {
- rescanList.add(qd);
- }
- // We want this particular document to be not included in the
- // reprocessing. Therefore, we do the same thing as we would
- // if we got back a null version.
- deleteList.add(qd);
+ // Special treatment for aborted documents.
+ // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc.
+ // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
+ // Add to the finish list, so it gets requeued. Because the document is already marked as aborted, this should be enough to cause an
+ // unconditional requeue.
+ finishList.add(qd);
}
- else
+ else if (activity.wasDocumentDeleted(qd.getDocumentDescription().getDocumentIdentifier()))
{
- // Retry this document according to the parameters provided.
- jobManager.resetDocument(dd,e.getRetryTime(),
- IJobManager.ACTION_RESCAN,e.getFailTime(),e.getFailRetryCount());
- qd.setProcessed();
+ deleteList.add(qd);
}
- }
-
- // All active documents have been removed from the list
- activeDocuments.clear();
-
- }
-
- // If version fetch was successful, the go on to processing phase
- if (successfulVersions)
- {
- // This try{ } is for releasing document versions at the connector level.
- try
- {
-
- // Loop through documents now, and amass what we need to fetch.
- // We also need to tally: (1) what needs to be marked as deleted via
- // jobManager.markDocumentDeleted();
- // (2) what needs to be noted as a deletion to ingester
- // (3) what needs to be noted as a check for the ingester
- for (int i = 0; i < activeDocuments.size(); i++)
+ else if (serviceInterruption != null)
{
- QueuedDocument qd = activeDocuments.get(i);
+ // Service interruption has precedence over unchanged, because we might have been interrupted while scanning the document
+ // for references
DocumentDescription dd = qd.getDocumentDescription();
- // If this document was aborted, then treat it specially; we never go on to fetch it, for one thing.
- if (abortSet.contains(dd.getDocumentIdentifier()))
+ // Check for hard failure. But no hard failure possible of it's a job inactive abort.
+ if (!serviceInterruption.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < serviceInterruption.getRetryTime() ||
+ dd.getFailRetryCount() == 0))
{
- // Special treatment for aborted documents.
- // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc.
- // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
- // Add to the finish list, so it gets requeued. Because the document is already marked as aborted, this should be enough to cause an
- // unconditional requeue.
- finishList.add(qd);
- }
- else
- {
- // Compare against old version.
- // We call the incremental ingester to make the decision for us as to whether we refetch a document or not.
-
- String documentIDHash = dd.getDocumentIdentifierHash();
- VersionContext newDocContext = documentVersions.getDocumentVersion(dd.getDocumentIdentifier());
-
- if (newDocContext == null)
+ // Treat this as a hard failure.
+ if (serviceInterruption.isAbortOnFail())
{
- deleteList.add(qd);
+ // Make sure that the job aborts.
+ abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((serviceInterruption.getCause()!=null)?": "+serviceInterruption.getCause().getMessage():""),serviceInterruption.getCause());
+ rescanList.add(qd);
}
else
{
- // Not getting deleted, so we must do the finish processing (i.e. conditionally requeue), so note that.
- finishList.add(qd);
-
- // See if we need to add, or update.
- IPipelineSpecificationWithVersions specWithVersions = new PipelineSpecificationWithVersions(pipelineSpecification,qd);
- boolean allowIngest = ingester.checkFetchDocument(specWithVersions,
- newDocContext.getVersionString(),
- newParameterVersion,
- aclAuthority);
-
- fetchList.add(new DocumentToProcess(qd,!allowIngest));
- if (!allowIngest)
- ingesterCheckList.add(documentIDHash);
+ // Skip the document, rather than failing.
+ // We want this particular document to be not included in the
+ // reprocessing. Therefore, we do the same thing as we would
+ // if we got back a null version.
+ deleteList.add(qd);
}
}
-
- }
- activeDocuments.clear();
-
- // We are done transfering activeDocuments documents to the other lists for processing.
- // Those lists will all need to be processed, but the processList is special because it
- // must be processed in the same context as the version fetch.
-
- // Note the documents that have been checked but not reingested. This should happen BEFORE we need
- // the statistics (which are calculated during the finishlist step below)
- if (ingesterCheckList.size() > 0)
- {
- String[] checkClasses = new String[ingesterCheckList.size()];
- String[] checkIDs = new String[ingesterCheckList.size()];
- for (int i = 0; i < checkIDs.length; i++)
+ else
{
- checkClasses[i] = connectionName;
- checkIDs[i] = ingesterCheckList.get(i);
+ // Not a hard failure. Requeue.
+ requeueList.add(qd);
}
- ingester.documentCheckMultiple(pipelineSpecificationBasic,checkClasses,checkIDs,currentTime);
}
+ else if (activity.wasDocumentUnchanged(qd.getDocumentDescription().getDocumentIdentifier()))
+ {
+ finishList.add(qd);
+ ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
+ }
+ else
+ {
+ // All documents not specifically called out above are simply finished, since we know they haven't been deleted.
+ finishList.add(qd);
+ }
+ }
- // First, make the things we will need for all subsequent steps.
- // We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process.
- // We put this in a map so it can be looked up by document identifier.
- Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications = new HashMap<String,IPipelineSpecificationWithVersions>();
- for (int i = 0; i < fetchList.size(); i++)
+ if (serviceInterruption != null)
+ {
+ // Requeue the documents we've identified as needing to be repeated
+ requeueDocuments(jobManager,requeueList,serviceInterruption.getRetryTime(),serviceInterruption.getFailTime(),
+ serviceInterruption.getFailRetryCount());
+ }
+
+ // Note the documents that have been checked but not reingested. This should happen BEFORE we need
+ // the statistics (which are calculated during the finishlist step below)
+ if (ingesterCheckList.size() > 0)
+ {
+ String[] checkClasses = new String[ingesterCheckList.size()];
+ String[] checkIDs = new String[ingesterCheckList.size()];
+ for (int i = 0; i < checkIDs.length; i++)
{
- QueuedDocument qd = fetchList.get(i).getDocument();
- fetchPipelineSpecifications.put(qd.getDocumentDescription().getDocumentIdentifierHash(),
- new PipelineSpecificationWithVersions(pipelineSpecification,qd));
+ checkClasses[i] = connectionName;
+ checkIDs[i] = ingesterCheckList.get(i);
}
-
- ProcessActivity activity = new ProcessActivity(job.getID(),processID,
- threadContext,rt,jobManager,ingester,
- connectionName,pipelineSpecification,
- fetchPipelineSpecifications,
- currentTime,
- job.getExpiration(),
- job.getForcedMetadata(),
- job.getInterval(),
- job.getMaxInterval(),
- job.getHopcountMode(),
- connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,
- newParameterVersion);
- try
+ ingester.documentCheckMultiple(pipelineSpecificationBasic,checkClasses,checkIDs,currentTime);
+ }
+
+ // Process the finish list!
+ if (finishList.size() > 0)
+ {
+ // "Finish" the documents (removing unneeded carrydown info, and compute hopcounts).
+ // This can ONLY be done on fully-completed documents; everything else should be left in a dangling
+ // state (which we know is OK because it will be fixed the next time the document is attempted).
+ String[] documentIDHashes = new String[finishList.size()];
+ k = 0;
+ for (QueuedDocument qd : finishList)
{
+ documentIDHashes[k++] = qd.getDocumentDescription().getDocumentIdentifierHash();
+ }
+ DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,documentIDHashes,job.getHopcountMode());
+ ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,rt,currentTime);
- // Finishlist and Fetchlist are parallel. Fetchlist contains what we need to process.
- if (fetchList.size() > 0)
+ // In both job types, we have to go through the finishList to figure out what to do with the documents.
+ // In the case of a document that was aborted, we must requeue it for immediate reprocessing in BOTH job types.
+ switch (job.getType())
+ {
+ case IJobDescription.TYPE_CONTINUOUS:
{
- // Build a list of id's and flags
- String[] processIDs = new String[fetchList.size()];
- String[] processIDHashes = new String[fetchList.size()];
- boolean[] scanOnly = new boolean[fetchList.size()];
-
- for (int i = 0; i < fetchList.size(); i++)
+ // We need to populate timeArray
+ String[] timeIDClasses = new String[finishList.size()];
+ String[] timeIDHashes = new String[finishList.size()];
+ for (int i = 0; i < timeIDHashes.length; i++)
{
- DocumentToProcess dToP = fetchList.get(i);
- DocumentDescription dd = dToP.getDocument().getDocumentDescription();
- processIDs[i] = dd.getDocumentIdentifier();
- processIDHashes[i] = dd.getDocumentIdentifierHash();
- scanOnly[i] = dToP.getScanOnly();
+ QueuedDocument qd = (QueuedDocument)finishList.get(i);
+ DocumentDescription dd = qd.getDocumentDescription();
+ String documentIDHash = dd.getDocumentIdentifierHash();
+ timeIDClasses[i] = connectionName;
+ timeIDHashes[i] = documentIDHash;
}
-
- if (Thread.currentThread().isInterrupted())
- throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
-
- if (Logging.threads.isDebugEnabled())
- Logging.threads.debug("Worker thread about to process "+Integer.toString(processIDs.length)+" documents");
-
- // Now, process in bulk
- try
+ long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineSpecificationBasic,timeIDClasses,timeIDHashes);
+ Long[] recheckTimeArray = new Long[timeArray.length];
+ int[] actionArray = new int[timeArray.length];
+ DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
+ for (int i = 0; i < finishList.size(); i++)
{
+ QueuedDocument qd = finishList.get(i);
+ recrawlDocs[i] = qd.getDocumentDescription();
+ String documentID = recrawlDocs[i].getDocumentIdentifier();
- connector.processDocuments(processIDs,documentVersions,activity,scanOnly,jobType);
-
- // Flush remaining references into the database!
- activity.flush();
-
- // "Finish" the documents (removing unneeded carrydown info, etc.)
- DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,processIDHashes,job.getHopcountMode());
+ // If aborted due to sequencing issue, then requeue for reprocessing immediately, ignoring everything else.
+ boolean wasAborted = activity.wasDocumentAborted(documentID);
+ if (wasAborted)
+ {
+ // Requeue for immediate reprocessing
+ if (Logging.scheduling.isDebugEnabled())
+ Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED as soon as prerequisites are met");
- ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,
- requeueCandidates,connector,connection,rt,currentTime);
+ actionArray[i] = IJobManager.ACTION_RESCAN;
+ recheckTimeArray[i] = new Long(0L); // Must not use null; that means 'never'.
+ }
+ else
+ {
+ // Calculate the next time to run, or time to expire.
- if (Logging.threads.isDebugEnabled())
- Logging.threads.debug("Worker thread done processing "+Integer.toString(processIDs.length)+" documents");
+ // For run time, the formula is to calculate the running avg interval between changes,
+ // add an additional interval (which comes from the job description),
+ // and add that to the current time.
+ // One caveat: we really want to calculate the interval from the last
+ // time change was detected, but this is not implemented yet.
+ long timeAmt = timeArray[i];
+ // null value indicates never to schedule
- }
- catch (ServiceInterruption e)
- {
- // This service interruption could have resulted
- // after some or all of the documents ingested.
- // They will therefore need to go into the PENDINGPURGATORY
- // state.
-
- if (!e.jobInactiveAbort())
- Logging.jobs.warn("Service interruption reported for job "+
- job.getID()+" connection '"+job.getConnectionName()+"': "+
- e.getMessage());
-
- if (!e.jobInactiveAbort() && e.isAbortOnFail())
- abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
-
- // Mark the current documents to be recrawled in the
- // time specified, except for the ones beyond their limits.
- // Those will either be deleted, or an exception will be thrown that
- // will abort the current job.
+ Long recrawlTime = activity.calculateDocumentRescheduleTime(currentTime,timeAmt,documentID);
+ Long expireTime = activity.calculateDocumentExpireTime(currentTime,documentID);
- deleteList.clear();
- ArrayList requeueList = new ArrayList();
- Set<String> fetchDocuments = new HashSet<String>();
- for (int i = 0; i < fetchList.size(); i++)
- {
- fetchDocuments.add(fetchList.get(i).getDocument().getDocumentDescription().getDocumentIdentifierHash());
- }
- List<QueuedDocument> newFinishList = new ArrayList<QueuedDocument>();
- for (int i = 0; i < finishList.size(); i++)
- {
- QueuedDocument qd = finishList.get(i);
- if (fetchDocuments.contains(qd.getDocumentDescription().getDocumentIdentifierHash()))
+ // Merge the two times together. We decide on the action based on the action with the lowest time.
+ if (expireTime == null || (recrawlTime != null && recrawlTime.longValue() < expireTime.longValue()))
{
- DocumentDescription dd = qd.getDocumentDescription();
- // Check for hard failure. But no hard failure possible of it's a job inactive abort.
- if (!e.jobInactiveAbort() && (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
- dd.getFailRetryCount() == 0))
- {
- // Treat this as a hard failure.
- if (e.isAbortOnFail())
- {
- rescanList.add(qd);
- }
- else
- {
- // We want this particular document to be not included in the
- // reprocessing. Therefore, we do the same thing as we would
- // if we got back a null version.
- deleteList.add(qd);
- }
- }
- else
- {
- requeueList.add(qd);
- }
+ if (Logging.scheduling.isDebugEnabled())
+ Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
+ recheckTimeArray[i] = recrawlTime;
+ actionArray[i] = IJobManager.ACTION_RESCAN;
+ }
+ else if (recrawlTime == null || (expireTime != null && recrawlTime.longValue() > expireTime.longValue()))
+ {
+ if (Logging.scheduling.isDebugEnabled())
+ Logging.scheduling.debug("Document '"+documentID+"' will be REMOVED at "+expireTime.toString());
+ recheckTimeArray[i] = expireTime;
+ actionArray[i] = IJobManager.ACTION_REMOVE;
}
else
- newFinishList.add(qd);
+ {
+ // Default activity if conflict will be rescan
+ if (Logging.scheduling.isDebugEnabled() && recrawlTime != null)
+ Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
+ recheckTimeArray[i] = recrawlTime;
+ actionArray[i] = IJobManager.ACTION_RESCAN;
+ }
}
-
- // Requeue the documents we've identified
- requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
- e.getFailRetryCount());
-
- // We've disposed of all the documents, so finishlist is now clear
- finishList = newFinishList;
}
- } // End of fetching
- if (finishList.size() > 0)
+ jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);
+
+ }
+ break;
+ case IJobDescription.TYPE_SPECIFIED:
{
- // In both job types, we have to go through the finishList to figure out what to do with the documents.
- // In the case of a document that was aborted, we must requeue it for immediate reprocessing in BOTH job types.
- switch (job.getType())
+ // Separate the ones we actually finished from the ones we need to requeue because they were aborted
+ List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
+ List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
+ for (int i = 0; i < finishList.size(); i++)
{
- case IJobDescription.TYPE_CONTINUOUS:
+ QueuedDocument qd = finishList.get(i);
+ DocumentDescription dd = qd.getDocumentDescription();
+ if (activity.wasDocumentAborted(dd.getDocumentIdentifier()))
{
- // We need to populate timeArray
- String[] timeIDClasses = new String[finishList.size()];
- String[] timeIDHashes = new String[finishList.size()];
- for (int i = 0; i < timeIDHashes.length; i++)
- {
- QueuedDocument qd = (QueuedDocument)finishList.get(i);
- DocumentDescription dd = qd.getDocumentDescription();
- String documentIDHash = dd.getDocumentIdentifierHash();
- timeIDClasses[i] = connectionName;
- timeIDHashes[i] = documentIDHash;
- }
- long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineSpecificationBasic,timeIDClasses,timeIDHashes);
- Long[] recheckTimeArray = new Long[timeArray.length];
- int[] actionArray = new int[timeArray.length];
- DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
- for (int i = 0; i < finishList.size(); i++)
- {
- QueuedDocument qd = finishList.get(i);
- recrawlDocs[i] = qd.getDocumentDescription();
- String documentID = recrawlDocs[i].getDocumentIdentifier();
-
- // If aborted due to sequencing issue, then requeue for reprocessing immediately, ignoring everything else.
- boolean wasAborted = abortSet.contains(documentID);
- if (wasAborted)
- {
- // Requeue for immediate reprocessing
- if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED as soon as prerequisites are met");
-
- actionArray[i] = IJobManager.ACTION_RESCAN;
- recheckTimeArray[i] = new Long(0L); // Must not use null; that means 'never'.
- }
- else
- {
- // Calculate the next time to run, or time to expire.
-
- // For run time, the formula is to calculate the running avg interval between changes,
- // add an additional interval (which comes from the job description),
- // and add that to the current time.
- // One caveat: we really want to calculate the interval from the last
- // time change was detected, but this is not implemented yet.
- long timeAmt = timeArray[i];
- // null value indicates never to schedule
-
- Long recrawlTime = activity.calculateDocumentRescheduleTime(currentTime,timeAmt,documentID);
- Long expireTime = activity.calculateDocumentExpireTime(currentTime,documentID);
-
-
- // Merge the two times together. We decide on the action based on the action with the lowest time.
- if (expireTime == null || (recrawlTime != null && recrawlTime.longValue() < expireTime.longValue()))
- {
- if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
- recheckTimeArray[i] = recrawlTime;
- actionArray[i] = IJobManager.ACTION_RESCAN;
- }
- else if (recrawlTime == null || (expireTime != null && recrawlTime.longValue() > expireTime.longValue()))
- {
- if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Document '"+documentID+"' will be REMOVED at "+expireTime.toString());
- recheckTimeArray[i] = expireTime;
- actionArray[i] = IJobManager.ACTION_REMOVE;
- }
- else
- {
- // Default activity if conflict will be rescan
- if (Logging.scheduling.isDebugEnabled() && recrawlTime != null)
- Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
- recheckTimeArray[i] = recrawlTime;
- actionArray[i] = IJobManager.ACTION_RESCAN;
- }
- }
- }
-
- jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);
-
+ // The document was aborted, so put it into the abortedList
+ abortedList.add(dd);
}
- break;
- case IJobDescription.TYPE_SPECIFIED:
+ else
{
- // Separate the ones we actually finished from the ones we need to requeue because they were aborted
- List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
- List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
- for (int i = 0; i < finishList.size(); i++)
- {
- QueuedDocument qd = finishList.get(i);
- DocumentDescription dd = qd.getDocumentDescription();
- if (abortSet.contains(dd.getDocumentIdentifier()))
- {
- // The document was aborted, so put it into the abortedList
- abortedList.add(dd);
- }
- else
- {
- // The document was completed.
- completedList.add(dd);
- }
- }
-
- // Requeue the ones that must be repeated
- if (abortedList.size() > 0)
- {
- DocumentDescription[] docDescriptions = new DocumentDescription[abortedList.size()];
- Long[] recheckTimeArray = new Long[docDescriptions.length];
- int[] actionArray = new int[docDescriptions.length];
- for (int i = 0; i < docDescriptions.length; i++)
- {
- docDescriptions[i] = abortedList.get(i);
- recheckTimeArray[i] = new Long(0L);
- actionArray[i] = IJobManager.ACTION_RESCAN;
- }
-
- jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
- }
-
- // Mark the ones completed that were actually completed.
- if (completedList.size() > 0)
- {
- DocumentDescription[] docDescriptions = new DocumentDescription[completedList.size()];
- for (int i = 0; i < docDescriptions.length; i++)
- {
- docDescriptions[i] = (DocumentDescription)completedList.get(i);
- }
-
- jobManager.markDocumentCompletedMultiple(docDescriptions);
- }
+ // The document was completed.
+ completedList.add(dd);
}
- break;
- default:
- throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'");
}
- // Finally, if we're still alive, mark everything as "processed".
- for (int i = 0; i < finishList.size(); i++)
+ // Requeue the ones that must be repeated
+ if (abortedList.size() > 0)
{
- QueuedDocument qd = finishList.get(i);
- qd.setProcessed();
+ DocumentDescription[] docDescriptions = new DocumentDescription[abortedList.size()];
+ Long[] recheckTimeArray = new Long[docDescriptions.length];
+ int[] actionArray = new int[docDescriptions.length];
+ for (int i = 0; i < docDescriptions.length; i++)
+ {
+ docDescriptions[i] = abortedList.get(i);
+ recheckTimeArray[i] = new Long(0L);
+ actionArray[i] = IJobManager.ACTION_RESCAN;
+ }
+
+ jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
}
+ // Mark the ones completed that were actually completed.
+ if (completedList.size() > 0)
+ {
+ DocumentDescription[] docDescriptions = new DocumentDescription[completedList.size()];
+ for (int i = 0; i < docDescriptions.length; i++)
+ {
+ docDescriptions[i] = (DocumentDescription)completedList.get(i);
+ }
+
+ jobManager.markDocumentCompletedMultiple(docDescriptions);
+ }
}
-
+ break;
+ default:
+ throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'");
}
- finally
+
+ // Finally, if we're still alive, mark everything we finished as "processed".
+ for (QueuedDocument qd : finishList)
{
- // Make sure we don't leave any dangling carrydown files
- activity.discard();
+ qd.setProcessed();
}
-
- // Successful processing of the set
- // We count 'get version' time in the average, so even if we decide not to process a doc
- // it still counts.
- queueTracker.noteConnectionPerformance(qds.getCount(),connectionName,System.currentTimeMillis() - processingStartTime);
-
}
- finally
- {
- // Release any document temporary storage held by the connector
- connector.releaseDocumentVersions(currentDocIDArray,documentVersions);
- }
-
}
+ finally
+ {
+ // Make sure we don't leave any dangling carrydown files
+ activity.discard();
+ }
+
+ // Successful processing of the set
+ // We count 'get version' time in the average, so even if we decide not to process a doc
+ // it still counts.
+ queueTracker.noteConnectionPerformance(qds.getCount(),connectionName,System.currentTimeMillis() - processingStartTime);
+
}
// Now, handle the delete list
@@ -1110,9 +980,8 @@ public class WorkerThread extends Thread
requeueCandidates,connector,connection,rt,currentTime);
// Mark all these as done
- for (int i = 0; i < jobmanagerRemovalList.size(); i++)
+ for (QueuedDocument qd : jobmanagerRemovalList)
{
- QueuedDocument qd = jobmanagerRemovalList.get(i);
qd.setProcessed();
}
}
@@ -1132,34 +1001,22 @@ public class WorkerThread extends Thread
{
DocumentDescription[] requeueDocs = new DocumentDescription[requeueList.size()];
- int i = 0;
- while (i < requeueDocs.length)
+ for (int i = 0; i < requeueDocs.length; i++)
{
QueuedDocument qd = requeueList.get(i);
DocumentDescription dd = qd.getDocumentDescription();
requeueDocs[i] = dd;
- i++;
}
jobManager.resetDocumentMultiple(requeueDocs,retryTime,IJobManager.ACTION_RESCAN,failTime,failCount);
- i = 0;
- while (i < requeueList.size())
+ for (QueuedDocument qd : requeueList)
{
- QueuedDocument qd = requeueList.get(i++);
qd.setProcessed();
}
}
}
- protected static String packTransformations(String[] transformationNames, String[] transformationDescriptionStrings)
- {
- StringBuilder sb = new StringBuilder();
- packList(sb,transformationNames,'+');
- packList(sb,transformationDescriptionStrings,'!');
- return sb.toString();
- }
-
/** Another stuffer for packing lists of variable length */
protected static void packList(StringBuilder output, String[] values, char delimiter)
{
@@ -1219,231 +1076,6 @@ public class WorkerThread extends Thread
// Nested classes
- /** Version activity class wraps access to activity history.
- */
- protected static class VersionActivity implements IVersionActivity
- {
- protected final Long jobID;
- protected final String processID;
- protected final String connectionName;
- protected final IPipelineSpecification pipelineSpecification;
- protected final IRepositoryConnectionManager connMgr;
- protected final IJobManager jobManager;
- protected final IIncrementalIngester ingester;
- protected final Set<String> abortSet;
- protected final CheckActivity checkActivity;
- /** Constructor.
- */
- public VersionActivity(Long jobID, String processID,
- String connectionName, IPipelineSpecification pipelineSpecification,
- IRepositoryConnectionManager connMgr,
- IJobManager jobManager, IIncrementalIngester ingester, Set<String> abortSet,
- CheckActivity checkActivity)
- {
- this.jobID = jobID;
- this.processID = processID;
- this.connectionName = connectionName;
- this.pipelineSpecification = pipelineSpecification;
- this.connMgr = connMgr;
- this.jobManager = jobManager;
- this.ingester = ingester;
- this.abortSet = abortSet;
- this.checkActivity = checkActivity;
- }
-
- /** Check whether a mime type is indexable by the currently specified output connector.
- *@param mimeType is the mime type to check, not including any character set specification.
- *@return true if the mime type is indexable.
- */
- @Override
- public boolean checkMimeTypeIndexable(String mimeType)
- throws ManifoldCFException, ServiceInterruption
- {
- return ingester.checkMimeTypeIndexable(
- pipelineSpecification,
- mimeType,
- checkActivity);
- }
-
- /** Check whether a document is indexable by the currently specified output connector.
- *@param localFile is the local copy of the file to check.
- *@return true if the document is indexable.
- */
- @Override
- public boolean checkDocumentIndexable(File localFile)
- throws ManifoldCFException, ServiceInterruption
- {
- return ingester.checkDocumentIndexable(
- pipelineSpecification,
- localFile,
- checkActivity);
- }
-
- /** Check whether a document of a specified length is indexable by the currently specified output connector.
- *@param length is the length to check.
- *@return true if the document is indexable.
- */
- @Override
- public boolean checkLengthIndexable(long length)
- throws ManifoldCFException, ServiceInterruption
- {
- return ingester.checkLengthIndexable(
- pipelineSpecification,
- length,
- checkActivity);
- }
-
- /** Pre-determine whether a document's URL is indexable by this connector. This method is used by participating repository connectors
- * to help filter out documents that are not worth indexing.
- *@param url is the URL of the document.
- *@return true if the file is indexable.
- */
- @Override
- public boolean checkURLIndexable(String url)
- throws ManifoldCFException, ServiceInterruption
- {
- return ingester.checkURLIndexable(
- pipelineSpecification,
- url,
- checkActivity);
- }
-
- /** Record time-stamped information about the activity of the connector.
- *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every
- * activity has an associated time; the startTime field records when the activity began. A null value
- * indicates that the start time and the finishing time are the same.
- *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is
- * used to categorize what kind of activity is being recorded. For example, a web connector might record a
- * "fetch document" activity. Cannot be null.
- *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable.
- *@param entityIdentifier is a (possibly long) string which identifies the object involved in the history record.
- * The interpretation of this field will differ from connector to connector. May be null.
- *@param resultCode contains a terse description of the result of the activity. The description is limited in
- * size to 255 characters, and can be interpreted only in the context of the current connector. May be null.
- *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
- * described in the resultCode field. This field is not meant to be queried on. May be null.
- *@param childIdentifiers is a set of child entity identifiers associated with this activity. May be null.
- */
- @Override
- public void recordActivity(Long startTime, String activityType, Long dataSize,
- String entityIdentifier, String resultCode, String resultDescription, String[] childIdentifiers)
- throws ManifoldCFException
- {
- connMgr.recordHistory(connectionName,startTime,activityType,dataSize,entityIdentifier,resultCode,
- resultDescription,childIdentifiers);
- }
-
- /** Retrieve data passed from parents to a specified child document.
- *@param localIdentifier is the document identifier of the document we want the recorded data for.
- *@param dataName is the name of the data items to retrieve.
- *@return an array containing the unique data values passed from ALL parents. Note that these are in no particular order, and there will not be any duplicates.
- */
- @Override
- public String[] retrieveParentData(String localIdentifier, String dataName)
- throws ManifoldCFException
- {
- return jobManager.retrieveParentData(jobID,ManifoldCF.hash(localIdentifier),dataName);
- }
-
- /** Retrieve data passed from parents to a specified child document.
- *@param localIdentifier is the document identifier of the document we want the recorded data for.
- *@param dataName is the name of the data items to retrieve.
- *@return an array containing the unique data values passed from ALL parents. Note that these are in no particular order, and there will not be any duplicates.
- */
- @Override
- public CharacterInput[] retrieveParentDataAsFiles(String localIdentifier, String dataName)
- throws ManifoldCFException
- {
- return jobManager.retrieveParentDataAsFiles(jobID,ManifoldCF.hash(localIdentifier),dataName);
- }
-
- /** Check whether current job is still active.
- * This method is provided to allow an individual connector that needs to wait on some long-term condition to give up waiting due to the job
- * itself being aborted. If the connector should abort, this method will raise a properly-formed ServiceInterruption, which if thrown to the
- * caller, will signal that the current versioning activity remains incomplete and must be retried when the job is resumed.
- */
- @Override
- public void checkJobStillActive()
- throws ManifoldCFException, ServiceInterruption
- {
- if (jobManager.checkJobActive(jobID) == false)
- throw new ServiceInterruption("Job no longer active",System.currentTimeMillis(),true);
- }
-
- /** Begin an event sequence.
- * This method should be called by a connector when a sequencing event should enter the "pending" state. If the event is already in that state,
- * this method will return false, otherwise true. The connector has the responsibility of appropriately managing sequencing given the response
- * status.
- *@param eventName is the event name.
- *@return false if the event is already in the "pending" state.
- */
- @Override
- public boolean beginEventSequence(String eventName)
- throws ManifoldCFException
- {
- return jobManager.beginEventSequence(processID,eventName);
- }
-
- /** Complete an event sequence.
- * This method should be called to signal that an event is no longer in the "pending" state. This can mean that the prerequisite processing is
- * completed, but it can also mean that prerequisite processing was aborted or cannot be completed.
- * Note well: This method should not be called unless the connector is CERTAIN that an event is in progress, and that the current thread has
- * the sole right to complete it. Otherwise, race conditions can develop which would be difficult to diagnose.
- *@param eventName is the event name.
- */
- @Override
- public void completeEventSequence(String eventName)
- throws ManifoldCFException
- {
- jobManager.completeEventSequence(eventName);
- }
-
- /** Abort processing a document (for sequencing reasons).
- * This method should be called in order to cause the specified document to be requeued for later processing. While this is similar in some respects
- * to the semantics of a ServiceInterruption, it is applicable to only one document at a time, and also does not specify any delay period, since it is
- * presumed that the reason for the requeue is because of sequencing issues synchronized around an underlying event.
- *@param localIdentifier is the document identifier to requeue
- */
- @Override
- public void retryDocumentProcessing(String localIdentifier)
- throws ManifoldCFException
- {
- // Accumulate aborts
- abortSet.add(localIdentifier);
- }
-
- /** Create a global string from a simple string.
- *@param simpleString is the simple string.
- *@return a global string.
- */
- @Override
- public String createGlobalString(String simpleString)
- {
- return ManifoldCF.createGlobalString(simpleString);
- }
-
- /** Create a connection-specific string from a simple string.
- *@param simpleString is the simple string.
- *@return a connection-specific string.
- */
- @Override
- public String createConnectionSpecificString(String simpleString)
- {
- return ManifoldCF.createConnectionSpecificString(connectionName,simpleString);
- }
-
- /** Create a job-based string from a simple string.
- *@param simpleString is the simple string.
- *@return a job-specific string.
- */
- @Override
- public String createJobSpecificString(String simpleString)
- {
- return ManifoldCF.createJobSpecificString(jobID,simpleString);
- }
-
- }
-
/** Process activity class wraps access to the ingester and job queue.
*/
protected static class ProcessActivity implements IProcessActivity
@@ -1469,7 +1101,6 @@ public class WorkerThread extends Thread
protected final String[] legalLinkTypes;
protected final OutputActivity ingestLogger;
protected final IReprioritizationTracker rt;
- protected final Set<String> abortSet;
protected final String parameterVersion;
// We submit references in bulk, because that's way more efficient.
@@ -1484,6 +1115,15 @@ public class WorkerThread extends Thread
// Origination times
protected final Map<String,Long> originationTimes = new HashMap<String,Long>();
+ // Whether the document was aborted or not
+ protected final Set<String> abortSet = new HashSet<String>();
+
+ // Whether the document was checked or not
+ protected final Set<String> documentCheckedSet = new HashSet<String>();
+
+ // Whether document was deleted
+ protected final Set<String> documentDeletedSet = new HashSet<String>();
+
/** Constructor.
*@param jobManager is the job manager
*@param ingester is the ingester
@@ -1503,7 +1143,6 @@ public class WorkerThread extends Thread
int hopcountMode,
IRepositoryConnection connection, IRepositoryConnector connector,
IRepositoryConnectionManager connMgr, String[] legalLinkTypes, OutputActivity ingestLogger,
- Set<String> abortSet,
String parameterVersion)
{
this.jobID = jobID;
@@ -1526,7 +1165,6 @@ public class WorkerThread extends Thread
this.connMgr = connMgr;
this.legalLinkTypes = legalLinkTypes;
this.ingestLogger = ingestLogger;
- this.abortSet = abortSet;
this.parameterVersion = parameterVersion;
}
@@ -1541,6 +1179,27 @@ public class WorkerThread extends Thread
referenceList.clear();
}
+ /** Check whether a document (and its version string) was unchanged or not.
+ */
+ public boolean wasDocumentUnchanged(String documentIdentifier)
+ {
+ return documentCheckedSet.contains(documentIdentifier);
+ }
+
+ /** Check whether document was deleted or not.
+ */
+ public boolean wasDocumentDeleted(String documentIdentifier)
+ {
+ return documentDeletedSet.contains(documentIdentifier);
+ }
+
+ /** Check whether a document was aborted or not.
+ */
+ public boolean wasDocumentAborted(String documentIdentifier)
+ {
+ return abortSet.contains(documentIdentifier);
+ }
+
/** Add a document description to the current job's queue.
*@param localIdentifier is the local document identifier to add (for the connector that
* fetched the document).
@@ -1642,6 +1301,23 @@ public class WorkerThread extends Thread
existingDr.addPrerequisiteEvents(prereqEventNames);
}
+ /** Check if a document needs to be reindexed, based on a computed version string.
+ * Call this method to determine whether reindexing is necessary. Pass in a newly-computed version
+ * string. This method will return "true" if the document needs to be re-indexed.
+ *@param documentIdentifier is the document identifier.
+ *@param newVersionString is the newly-computed version string.
+ *@return true if the document needs to be reindexed.
+ */
+ @Override
+ public boolean checkDocumentNeedsReindexing(String documentIdentifier,
+ String newVersionString)
+ throws ManifoldCFException
+ {
+ String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
+ IPipelineSpecificationWithVersions spec = fetchPipelineSpecifications.get(documentIdentifierHash);
+ return ingester.checkFetchDocument(spec,newVersionString,parameterVersion,connection.getACLAuthority());
+ }
+
/** Add a document description to the current job's queue.
*@param localIdentifier is the local document identifier to add (for the connector that
* fetched the document).
@@ -1733,20 +1409,32 @@ public class WorkerThread extends Thread
return jobManager.retrieveParentDataAsFiles(jobID,ManifoldCF.hash(localIdentifier),dataName);
}
+ /** Note the fact that a document exists but is unchanged, and nothing further
+ * needs to be done to it.
+ * Call this method if it is determined that the document in question is identical to
+ * the formerly indexed document, AND when the version string for the document
+ * has not changed either.
+ */
+ @Override
+ public void noteUnchangedDocument(String documentIdentifier)
+ throws ManifoldCFException
+ {
+ documentCheckedSet.add(documentIdentifier);
+ }
+
/** Record a document version, but don't ingest it.
- * ServiceInterruption is thrown if this action must be rescheduled.
*@param documentIdentifier is the document identifier.
*@param version is the document version.
*/
@Override
public void recordDocument(String documentIdentifier, String version)
- throws ManifoldCFException, ServiceInterruption
+ throws ManifoldCFException
{
String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
ingester.documentRecord(
pipelineSpecification.getBasicPipelineSpecification(),
connectionName,documentIdentifierHash,
- version,currentTime,ingestLogger);
+ version,currentTime);
}
/** Ingest the current document.
@@ -1821,21 +1509,20 @@ public class WorkerThread extends Thread
data,currentTime,
documentURI,
ingestLogger);
+
}
- /** Delete the current document from the search engine index, while keeping track of the version information
+ /** Remove the specified document from the search engine index, while keeping track of the version information
* for it (to reduce churn).
*@param documentIdentifier is the document's local identifier.
- *@param version is the version of the document, as reported by the getDocumentVersions() method of the
- * corresponding repository connector.
+ *@param version is the version string to be recorded for the document.
*/
- @Override
- public void deleteDocument(String documentIdentifier, String version)
+ public void noDocument(String documentIdentifier, String version)
throws ManifoldCFException, ServiceInterruption
{
- if (version.length() == 0)
- deleteDocument(documentIdentifier);
- else
+ // Special interpretation for empty version string; treat as if the document doesn't exist
+ // (by ignoring it and allowing it to be deleted later)
+ if (version.length() > 0)
{
try
{
@@ -1847,22 +1534,35 @@ public class WorkerThread extends Thread
throw new IllegalStateException("IngestDocumentWithException threw an illegal IOException: "+e.getMessage(),e);
}
}
+
}
- /** Delete the current document from the search engine index. This method does NOT keep track of version
- * information for the document and thus can lead to "churn", whereby the same document is queued, versioned,
- * and removed on subsequent crawls. It therefore should be considered to be deprecated, in favor of
- * deleteDocument(String localIdentifier, String version).
+ /** Delete the current document from the search engine index, while keeping track of the version information
+ * for it (to reduce churn).
+ * Use noDocument() above instead.
*@param documentIdentifier is the document's local identifier.
+ *@param version is the version string to be recorded for the document.
*/
@Override
- public void deleteDocument(String documentIdentifier)
+ @Deprecated
+ public void deleteDocument(String documentIdentifier, String version)
throws ManifoldCFException, ServiceInterruption
{
- String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
- ingester.documentDelete(pipelineSpecification.getBasicPipelineSpecification(),
- connectionName,documentIdentifierHash,
- ingestLogger);
+ noDocument(documentIdentifier,version);
+ }
+
+ /** Delete the specified document from the search engine index, and from the status table. This
+ * method does NOT keep track of version
+ * information for the document and thus can lead to "churn", whereby the same document is queued, processed,
+ * and removed on subsequent crawls. It is therefore preferable to use noDocument() instead,
+ * in any case where the same decision will need to be made over and over.
+ *@param documentIdentifier is the document's identifier.
+ */
+ @Override
+ public void deleteDocument(String documentIdentifier)
+ throws ManifoldCFException
+ {
+ documentDeletedSet.add(documentIdentifier);
}
/** Override the schedule for the next time a document is crawled.
@@ -2574,6 +2274,38 @@ public class WorkerThread extends Thread
}
+ /** The implementation of the IExistingVersions interface.
+ */
+ protected static class ExistingVersions implements IExistingVersions
+ {
+ protected final Map<String,QueuedDocument> map;
+ protected final String lastOutputConnectionName;
+
+ public ExistingVersions(String lastOutputConnectionName, List<QueuedDocument> list)
+ {
+ this.lastOutputConnectionName = lastOutputConnectionName;
+ this.map = new HashMap<String,QueuedDocument>();
+ for (QueuedDocument qd : list)
+ {
+ map.put(qd.getDocumentDescription().getDocumentIdentifier(),qd);
+ }
+ }
+
+ /** Retrieve an existing version string given a document identifier.
+ *@param documentIdentifier is the document identifier.
+ *@return the document version string, or null if the document was never previously indexed.
+ */
+ public String getIndexedVersionString(String documentIdentifier)
+ {
+ QueuedDocument qd = map.get(documentIdentifier);
+ DocumentIngestStatus status = qd.getLastIngestedStatus(lastOutputConnectionName);
+ if (status == null)
+ return null;
+ return status.getDocumentVersion();
+ }
+
+ }
+
/** The ingest logger class */
protected static class OutputActivity extends CheckActivity implements IOutputActivity
{