You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/08/12 18:26:04 UTC
svn commit: r1372143 [2/2] - in /manifoldcf/branches/CONNECTORS-501:
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/
framework/pull-agent/src/main/java/...
Modified: manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1372143&r1=1372142&r2=1372143&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Sun Aug 12 16:26:04 2012
@@ -78,25 +78,23 @@ public class WorkerThread extends Thread
List<QueuedDocument> finishList = new ArrayList<QueuedDocument>();
Map<String,Integer> idHashIndexMap = new HashMap<String,Integer>();
- // This is the place we accumulate documents to be deleted via the ingester; each member is a
- // document ID string.
- List<String> ingesterDeleteList = new ArrayList<String>();
- List<String> ingesterDeleteListUnhashed = new ArrayList<String>();
-
// This is where we accumulate the document QueuedDocuments to be deleted from the job queue.
- // These MUST be cleaned up after the ingester delete is called, in order to guarantee
- // cleanup happens! Also note that there will be items in here that are NOT in the ingester
- // delete list...
- List<QueuedDocument> jobmanagerDeleteList = new ArrayList<QueuedDocument>();
+ List<QueuedDocument> deleteList = new ArrayList<QueuedDocument>();
+ // This is where we accumulate documents that need to be placed in the HOPCOUNTREMOVED
+ // state
+ List<QueuedDocument> hopcountremoveList = new ArrayList<QueuedDocument>();
+
+ // This is where we accumulate documents that need to be rescanned
+ List<QueuedDocument> rescanList = new ArrayList<QueuedDocument>();
+
// This is where we store document ID strings of documents that need to be noted as having
// been checked.
List<String> ingesterCheckList = new ArrayList<String>();
- // Temporary list
- List<String> tempIDHashList = new ArrayList<String>();
- List<String> tempIDList = new ArrayList<String>();
-
+ // Service interruption thrown with "abort on fail".
+ ManifoldCFException abortOnFail = null;
+
// Loop
while (true)
{
@@ -147,25 +145,30 @@ public class WorkerThread extends Thread
OutputActivity ingestLogger = new OutputActivity(connectionName,connMgr,outputName);
- // Put together document id's into an array, and versions into a map
- String[] docIDHashArray = new String[qds.getCount()];
- String[] docIDArray = new String[qds.getCount()];
- int i = 0;
- while (i < docIDArray.length)
+ // The flow through this section of the code is as follows.
+ // (1) We start with a list of documents
+ // (2) We attempt to do various things to these documents
+ // (3) Based on what happens, and what errors we get, we progressively move documents out of the main list
+ // and into secondary lists that will be all treated in the same way
+
+ // First, initialize the active document set to contain everything.
+ List<QueuedDocument> activeDocuments = new ArrayList<QueuedDocument>(qds.getCount());
+
+ for (int i = 0; i < qds.getCount(); i++)
{
QueuedDocument qd = qds.getDocument(i);
- docIDHashArray[i] = qd.getDocumentDescription().getDocumentIdentifierHash();
- docIDArray[i] = qd.getDocumentDescription().getDocumentIdentifier();
- i++;
+ activeDocuments.add(qd);
}
+ // Clear out all of our disposition lists
fetchList.clear();
finishList.clear();
versionMap.clear();
- ingesterDeleteList.clear();
- ingesterDeleteListUnhashed.clear();
- jobmanagerDeleteList.clear();
+ deleteList.clear();
ingesterCheckList.clear();
+ hopcountremoveList.clear();
+ rescanList.clear(); // jobManager.resetDocument(dd,0L,IJobManager.ACTION_RESCAN,-1L,-1);
+ abortOnFail = null;
// Keep track of the starting processing time, for statistics calculation
long processingStartTime = System.currentTimeMillis();
@@ -175,101 +178,79 @@ public class WorkerThread extends Thread
{
long currentTime = System.currentTimeMillis();
- // Do the hopcount checks, if any. This will iteratively reduce the viable list of
- // document identifiers in need of having their versions fetched.
- String[] legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
- // If this came back null, it means that there is no underlying implementation available, so treat this like a kind of service interruption.
- if (legalLinkTypes == null)
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug("Worker thread starting document count is "+Integer.toString(activeDocuments.size()));
+
+ // Get the legal link types. This is needed for later hopcount checking.
+ String[] legalLinkTypes = null;
+ if (activeDocuments.size() > 0)
{
- i = 0;
- while (i < qds.getCount())
+ legalLinkTypes = RepositoryConnectorFactory.getRelationshipTypes(threadContext,connection.getClassName());
+ // If this came back null, it means that there is no underlying implementation available, so treat this like a kind of service interruption.
+ if (legalLinkTypes == null)
{
- QueuedDocument qd = qds.getDocument(i++);
- DocumentDescription dd = qd.getDocumentDescription();
-
- jobManager.resetDocument(dd,0L,IJobManager.ACTION_RESCAN,-1L,-1);
- qd.setProcessed();
+ // Failure here puts all remaining documents into rescan list
+ moveList(activeDocuments,rescanList);
}
}
- else
+
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Post-linktype document count is "+Integer.toString(activeDocuments.size()));
+
+ // Do the hopcount checks, if any. This will iteratively reduce the viable list of
+ // document identifiers in need of having their versions fetched.
+ if (legalLinkTypes != null && activeDocuments.size() > 0)
{
- // This is where to put the version answers.
- String[] currentVersions = new String[docIDHashArray.length];
- // This is a map of docIDHash to index, so I know where to put the versions as they
- // come back from multiple sources.
- idHashIndexMap.clear();
- int z = 0;
- while (z < docIDHashArray.length)
+ // Set up the current ID array
+ String[] currentDocIDHashArray = new String[activeDocuments.size()];
+ for (int i = 0; i < currentDocIDHashArray.length; i++)
{
- idHashIndexMap.put(docIDHashArray[z],new Integer(z));
- z++;
+ currentDocIDHashArray[i] = activeDocuments.get(i).getDocumentDescription().getDocumentIdentifierHash();
}
-
- String[] currentDocIDHashArray = docIDHashArray;
- String[] currentDocIDArray = docIDArray;
-
Map filterMap = job.getHopCountFilters();
Iterator filterIter = filterMap.keySet().iterator();
+ // Array to accumulate hopcount results for all link types
+ boolean[] overallResults = new boolean[currentDocIDHashArray.length];
+ for (int i = 0; i < overallResults.length; i++)
+ {
+ overallResults[i] = true;
+ }
+ // Calculate the hopcount result for each link type, and fold it in.
while (filterIter.hasNext())
{
String linkType = (String)filterIter.next();
int maxHop = (int)((Long)filterMap.get(linkType)).longValue();
boolean[] results = jobManager.findHopCounts(job.getID(),legalLinkTypes,currentDocIDHashArray,linkType,
maxHop,job.getHopcountMode());
- tempIDHashList.clear();
- tempIDList.clear();
- z = 0;
- while (z < currentDocIDHashArray.length)
- {
- if (results[z] == false)
- {
- // Chuck it
- System.out.println("Hopcount exceeded: Getting rid of doc hash ID "+currentDocIDHashArray[z]+" id "+currentDocIDArray[z]);
- currentVersions[idHashIndexMap.get(currentDocIDHashArray[z]).intValue()] = null;
- }
- else
- {
- tempIDHashList.add(currentDocIDHashArray[z]);
- tempIDList.add(currentDocIDArray[z]);
- }
- z++;
- }
-
- currentDocIDHashArray = new String[tempIDHashList.size()];
- currentDocIDArray = new String[tempIDList.size()];
- z = 0;
- while (z < currentDocIDHashArray.length)
+ for (int i = 0; i < results.length; i++)
{
- currentDocIDHashArray[z] = tempIDHashList.get(z);
- currentDocIDArray[z] = tempIDList.get(z);
- z++;
+ overallResults[i] = overallResults[i] && results[i];
}
}
-
- if (Logging.threads.isDebugEnabled())
- Logging.threads.debug("Worker thread post-hopcount pruned document count is "+Integer.toString(currentDocIDHashArray.length));
-
- // Set up the old version string array
- String[] oldVersionStringArray = new String[currentDocIDHashArray.length];
- z = 0;
- while (z < oldVersionStringArray.length)
+ // Move all documents to the appropriate list
+ List<QueuedDocument> newActiveSet = new ArrayList<QueuedDocument>(activeDocuments.size());
+ for (int i = 0; i < overallResults.length; i++)
{
- String idHashValue = currentDocIDHashArray[z];
- QueuedDocument qd = qds.getDocument(idHashIndexMap.get(idHashValue).intValue());
- DocumentIngestStatus dis = qd.getLastIngestedStatus();
- if (dis == null)
- oldVersionStringArray[z] = null;
+ if (overallResults[i] == false)
+ {
+ hopcountremoveList.add(activeDocuments.get(i));
+ }
else
{
- oldVersionStringArray[z] = dis.getDocumentVersion();
- if (oldVersionStringArray[z] == null)
- oldVersionStringArray[z] = "";
+ newActiveSet.add(activeDocuments.get(i));
}
- z++;
}
+ activeDocuments = newActiveSet;
+ }
- // Grab a connector handle
- IRepositoryConnector connector = RepositoryConnectorFactory.grab(threadContext,
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug(" Post-hopcount pruned document count is "+Integer.toString(activeDocuments.size()));
+
+ // From here on down we need a connector instance, so get one.
+ IRepositoryConnector connector = null;
+ if (activeDocuments.size() > 0 || hopcountremoveList.size() > 0)
+ {
+ connector = RepositoryConnectorFactory.grab(threadContext,
connection.getClassName(),
connection.getConfigParams(),
connection.getMaxConnections());
@@ -279,52 +260,66 @@ public class WorkerThread extends Thread
// must be requeued for immediate reprocessing. When the rest of the world figures out that the job that owns this
// document is in fact unable to function, we'll stop getting such documents handed to us, because the state of the
// job will be changed.
+
if (connector == null)
{
- i = 0;
- while (i < qds.getCount())
- {
- QueuedDocument qd = qds.getDocument(i++);
- DocumentDescription dd = qd.getDocumentDescription();
-
- jobManager.resetDocument(dd,0L,IJobManager.ACTION_RESCAN,-1L,-1);
- qd.setProcessed();
- }
+ // Failure here puts all remaining documents into rescan list
+ moveList(activeDocuments,rescanList);
+ moveList(hopcountremoveList,rescanList);
}
- else
+ }
+
+ if (connector != null)
+ {
+ // Open try/finally block to free the connector instance no matter what
+ try
{
- // System.out.println(" Got a connector in thread "+id);
- try
+ // Check for interruption before we start fetching
+ if (Thread.currentThread().isInterrupted())
+ throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+
+ if (activeDocuments.size() > 0)
{
+ // === Fetch document versions ===
+ String[] currentDocIDHashArray = new String[activeDocuments.size()];
+ String[] currentDocIDArray = new String[activeDocuments.size()];
+ String[] oldVersionStringArray = new String[activeDocuments.size()];
- if (Thread.currentThread().isInterrupted())
- throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
+ for (int i = 0; i < activeDocuments.size(); i++)
+ {
+ QueuedDocument qd = activeDocuments.get(i);
+ currentDocIDHashArray[i] = qd.getDocumentDescription().getDocumentIdentifierHash();
+ currentDocIDArray[i] = qd.getDocumentDescription().getDocumentIdentifier();
+ DocumentIngestStatus dis = qd.getLastIngestedStatus();
+ if (dis == null)
+ oldVersionStringArray[i] = null;
+ else
+ {
+ oldVersionStringArray[i] = dis.getDocumentVersion();
+ if (oldVersionStringArray[i] == null)
+ oldVersionStringArray[i] = "";
+ }
+ }
// Get the output version string.
String outputVersion = ingester.getOutputDescription(outputName,outputSpec);
HashMap abortSet = new HashMap();
- ProcessActivity activity;
VersionActivity versionActivity = new VersionActivity(connectionName,connMgr,jobManager,job,ingester,abortSet,outputVersion);
String aclAuthority = connection.getACLAuthority();
boolean isDefaultAuthority = (aclAuthority == null || aclAuthority.length() == 0);
- // Fetch documents (if we need to)
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug("Worker thread getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
+
+ // === Fetch documents ===
+ // We start by getting the document version string.
+ String[] newVersionStringArray = null;
try
{
- if (Logging.threads.isDebugEnabled())
- Logging.threads.debug("Worker thread getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
-
- String[] newCurrentVersions = connector.getDocumentVersions(currentDocIDArray,oldVersionStringArray,
+ newVersionStringArray = connector.getDocumentVersions(currentDocIDArray,oldVersionStringArray,
versionActivity,spec,jobType,isDefaultAuthority);
- z = 0;
- while (z < currentDocIDHashArray.length)
- {
- currentVersions[((Integer)idHashIndexMap.get(currentDocIDHashArray[z])).intValue()] =
- newCurrentVersions[z];
- z++;
- }
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Worker thread done getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
@@ -342,10 +337,10 @@ public class WorkerThread extends Thread
// Mark the current documents to be recrawled at the
// time specified, with the proper error handling.
- i = 0;
- while (i < qds.getCount())
+ List<QueuedDocument> newActiveList = new ArrayList<QueuedDocument>(activeDocuments.size());
+ for (int i = 0; i < activeDocuments.size(); i++)
{
- QueuedDocument qd = qds.getDocument(i++);
+ QueuedDocument qd = activeDocuments.get(i);
DocumentDescription dd = qd.getDocumentDescription();
// If either we are going to be requeuing beyond the fail time, OR
// the number of retries available has hit 0, THEN we treat this
@@ -355,495 +350,473 @@ public class WorkerThread extends Thread
{
// Treat this as a hard failure.
if (e.isAbortOnFail())
- throw new ManifoldCFException("Repeated service interruptions - failure getting document version"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
+ {
+ abortOnFail = new ManifoldCFException("Repeated service interruptions - failure getting document version"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
+ rescanList.add(qd);
+ }
// We want this particular document to be not included in the
// reprocessing. Therefore, we do the same thing as we would
// if we got back a null version.
- DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
- String documentIDHash = dd.getDocumentIdentifierHash();
- // See if we need to delete
- if (oldDocStatus != null)
- {
- // Queue up to issue deletion
- ingesterDeleteList.add(documentIDHash);
- ingesterDeleteListUnhashed.add(dd.getDocumentIdentifier());
- }
- jobmanagerDeleteList.add(qd);
+ deleteList.add(qd);
}
else
{
+ // Retry this document according to the parameters provided.
jobManager.resetDocument(dd,e.getRetryTime(),
IJobManager.ACTION_RESCAN,e.getFailTime(),e.getFailRetryCount());
qd.setProcessed();
}
}
-
- processDeleteLists(outputName,connector,connection,jobManager,
- jobmanagerDeleteList,ingester,ingesterDeleteList,ingesterDeleteListUnhashed,
- job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
-
- // Done processing this set of documents; they've either been
- // requeued, deleted, or an exception has been thrown...
- continue;
+
+ // All active documents have been removed from the list
+ activeDocuments.clear();
+
}
- // Organize what we need for document status comparison, and get it into a canonical form.
- String newOutputVersion = outputVersion;
- if (newOutputVersion == null)
- newOutputVersion = "";
-
- try
+ // If version fetch was successful, the go on to processing phase
+ if (newVersionStringArray != null)
{
- // Loop through documents now, and amass what we need to fetch.
- // We also need to tally: (1) what needs to be marked as deleted via
- // jobManager.markDocumentDeleted();
- // (2) what needs to be noted as a deletion to ingester
- // (3) what needs to be noted as a check for the ingester
- i = 0;
- while (i < currentVersions.length)
+ // This try{ } is for releasing document versions at the connector level.
+ try
{
- QueuedDocument qd = qds.getDocument(i);
- DocumentDescription dd = qd.getDocumentDescription();
- // If this document was aborted, then treat it specially; we never go on to fetch it, for one thing.
- if (abortSet.get(dd.getDocumentIdentifier()) != null)
- {
- // Special treatment for aborted documents.
- // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc.
- // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
- // Add to the finish list, so it gets requeued. Because the document is already marked as aborted, this should be enough to cause an
- // unconditional requeue.
- finishList.add(qd);
- }
- else
+ // Organize what we need for document status comparison, and get it into a canonical form.
+ String newOutputVersion = outputVersion;
+ if (newOutputVersion == null)
+ newOutputVersion = "";
+
+ // Loop through documents now, and amass what we need to fetch.
+ // We also need to tally: (1) what needs to be marked as deleted via
+ // jobManager.markDocumentDeleted();
+ // (2) what needs to be noted as a deletion to ingester
+ // (3) what needs to be noted as a check for the ingester
+ for (int i = 0; i < activeDocuments.size(); i++)
{
- DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
- String documentIDHash = dd.getDocumentIdentifierHash();
- String newDocVersion = currentVersions[i];
- String newAuthorityName = aclAuthority;
- if (newAuthorityName == null)
- newAuthorityName = "";
-
- versionMap.put(dd.getDocumentIdentifierHash(),newDocVersion);
-
-
- if (newDocVersion == null)
+ QueuedDocument qd = activeDocuments.get(i);
+ DocumentDescription dd = qd.getDocumentDescription();
+ // If this document was aborted, then treat it specially; we never go on to fetch it, for one thing.
+ if (abortSet.get(dd.getDocumentIdentifier()) != null)
{
- // See if we need to delete
- if (oldDocStatus != null)
- {
- // Queue up to issue deletion
- ingesterDeleteList.add(documentIDHash);
- ingesterDeleteListUnhashed.add(dd.getDocumentIdentifier());
- }
- // We always add to the jobqueue delete list regardless.
- jobmanagerDeleteList.add(qd);
+ // Special treatment for aborted documents.
+ // We ignore the returned version string completely, since it's presumed that processing was not completed for this doc.
+ // We want to give up immediately on this one, and just requeue it for immediate reprocessing (pending its prereqs being all met).
+ // Add to the finish list, so it gets requeued. Because the document is already marked as aborted, this should be enough to cause an
+ // unconditional requeue.
+ finishList.add(qd);
}
else
{
- // Not getting deleted, so we must do the finish processing (i.e. conditionally requeue), so note that.
- finishList.add(qd);
+ DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
+ String documentIDHash = dd.getDocumentIdentifierHash();
+ String newDocVersion = newVersionStringArray[i];
+ String newAuthorityName = aclAuthority;
+ if (newAuthorityName == null)
+ newAuthorityName = "";
- // See if we need to add, or update.
- boolean allowIngest = false;
- if (oldDocStatus == null)
+ versionMap.put(dd.getDocumentIdentifierHash(),newDocVersion);
+
+ if (newDocVersion == null)
{
- // Add
- allowIngest = true;
- // Fall through to allow the processing
+ deleteList.add(qd);
}
else
{
- // Update. There are two possibilities here. (1) the same version
- // that was there before is there now (which may mean a rescan),
- // or (2) there are different versions (which ALWAYS means a rescan).
- String oldDocVersion = oldDocStatus.getDocumentVersion();
- if (oldDocVersion == null)
- oldDocVersion = "";
- String oldAuthorityName = oldDocStatus.getDocumentAuthorityNameString();
- if (oldAuthorityName == null)
- oldAuthorityName = "";
- String oldOutputVersion = oldDocStatus.getOutputVersion();
- if (oldOutputVersion == null)
- oldOutputVersion = "";
+ // Not getting deleted, so we must do the finish processing (i.e. conditionally requeue), so note that.
+ finishList.add(qd);
- // Start the comparison processing
- if (newDocVersion.length() == 0)
+ // See if we need to add, or update.
+ boolean allowIngest = false;
+ if (oldDocStatus == null)
{
- // Always reingest
+ // Add
allowIngest = true;
+ // Fall through to allow the processing
}
- else if (oldDocVersion.equals(newDocVersion) &&
- oldAuthorityName.equals(newAuthorityName) &&
- oldOutputVersion.equals(newOutputVersion))
+ else
{
- // The old logic was as follows:
- //
- // If the crawl is an incremental crawl, then we do NOT add this
- // document to the fetch list, even for scanning and no ingestion.
- // But we *do* add it, scan only, if this was a "full crawl".
- //
- // Apparently this was designed to prevent a document that had
- // already been processed and had queued stuff from causing deletions
- // under 'full scan' conditions, because those child documents would
- // not be requeued then. This contrasts with the incremental case,
- // where we really don't want to refetch the document simply to find
- // children - or do we? The connector has to make that decision, it
- // seems to me. If it's the kind of document that might have children,
- // then rescanning is warranted under ANY conditions; if it's not,
- // then the connector can decide to just do nothing.
- //
- // For the kinds of connectors where all documents have children,
- // preventing the fetch is not likely to help much. These kinds of
- // connectors (rss and web) depend on the document checksum to
- // determine version anyway, so the document is fetched regardless.
- // At least we prevent the ingestion.
+ // Update. There are two possibilities here. (1) the same version
+ // that was there before is there now (which may mean a rescan),
+ // or (2) there are different versions (which ALWAYS means a rescan).
+ String oldDocVersion = oldDocStatus.getDocumentVersion();
+ if (oldDocVersion == null)
+ oldDocVersion = "";
+ String oldAuthorityName = oldDocStatus.getDocumentAuthorityNameString();
+ if (oldAuthorityName == null)
+ oldAuthorityName = "";
+ String oldOutputVersion = oldDocStatus.getOutputVersion();
+ if (oldOutputVersion == null)
+ oldOutputVersion = "";
- // Fall through to allow the scanning, but not the ingest
+ // Start the comparison processing
+ if (newDocVersion.length() == 0)
+ {
+ // Always reingest
+ allowIngest = true;
+ }
+ else if (oldDocVersion.equals(newDocVersion) &&
+ oldAuthorityName.equals(newAuthorityName) &&
+ oldOutputVersion.equals(newOutputVersion))
+ {
+ // The old logic was as follows:
+ //
+ // If the crawl is an incremental crawl, then we do NOT add this
+ // document to the fetch list, even for scanning and no ingestion.
+ // But we *do* add it, scan only, if this was a "full crawl".
+ //
+ // Apparently this was designed to prevent a document that had
+ // already been processed and had queued stuff from causing deletions
+ // under 'full scan' conditions, because those child documents would
+ // not be requeued then. This contrasts with the incremental case,
+ // where we really don't want to refetch the document simply to find
+ // children - or do we? The connector has to make that decision, it
+ // seems to me. If it's the kind of document that might have children,
+ // then rescanning is warranted under ANY conditions; if it's not,
+ // then the connector can decide to just do nothing.
+ //
+ // For the kinds of connectors where all documents have children,
+ // preventing the fetch is not likely to help much. These kinds of
+ // connectors (rss and web) depend on the document checksum to
+ // determine version anyway, so the document is fetched regardless.
+ // At least we prevent the ingestion.
+
+ // Fall through to allow the scanning, but not the ingest
+ }
+ else
+ allowIngest = true;
}
- else
- allowIngest = true;
- }
- fetchList.add(new DocumentToProcess(qd,!allowIngest));
- if (!allowIngest)
- ingesterCheckList.add(documentIDHash);
+ fetchList.add(new DocumentToProcess(qd,!allowIngest));
+ if (!allowIngest)
+ ingesterCheckList.add(documentIDHash);
+ }
}
- }
-
- // Next version string!
- i++;
- }
+ }
+ activeDocuments.clear();
- // Done figuring out what to process. Now, we need to process it.
+ // We are done transfering activeDocuments documents to the other lists for processing.
+ // Those lists will all need to be processed, but the processList is special because it
+ // must be processed in the same context as the version fetch.
- // Clear the documents out of the queue
- processDeleteLists(outputName,connector,connection,jobManager,
- jobmanagerDeleteList,ingester,ingesterDeleteList,ingesterDeleteListUnhashed,
- job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
-
- // First, make the things we will need.
- activity = new ProcessActivity(threadContext,queueTracker,jobManager,ingester,
- currentTime,job,connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,outputVersion);
- try
- {
- // Fetchlist contains what we need to process.
- if (fetchList.size() > 0)
+ // Note the documents that have been checked but not reingested. This should happen BEFORE we need
+ // the statistics (which are calculated during the finishlist step below)
+ if (ingesterCheckList.size() > 0)
{
-
-
- // Build a list of id's and flags
- String[] processIDs = new String[fetchList.size()];
- String[] processIDHashes = new String[fetchList.size()];
- String[] versions = new String[fetchList.size()];
- boolean[] scanOnly = new boolean[fetchList.size()];
-
- i = 0;
- while (i < fetchList.size())
+ String[] checkClasses = new String[ingesterCheckList.size()];
+ String[] checkIDs = new String[ingesterCheckList.size()];
+ for (int i = 0; i < checkIDs.length; i++)
{
- DocumentToProcess dToP = fetchList.get(i);
- DocumentDescription dd = dToP.getDocument().getDocumentDescription();
- processIDs[i] = dd.getDocumentIdentifier();
- processIDHashes[i] = dd.getDocumentIdentifierHash();
- versions[i] = versionMap.get(dd.getDocumentIdentifierHash());
- scanOnly[i] = dToP.getScanOnly();
- i++;
+ checkClasses[i] = connectionName;
+ checkIDs[i] = ingesterCheckList.get(i);
}
+ ingester.documentCheckMultiple(outputName,checkClasses,checkIDs,currentTime);
+ }
- // Now, process in bulk
- try
+ // First, make the things we will need for all subsequent steps.
+ ProcessActivity activity = new ProcessActivity(threadContext,queueTracker,jobManager,ingester,
+ currentTime,job,connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,outputVersion);
+ try
+ {
+
+ // Fetchlist contains what we need to process.
+ if (fetchList.size() > 0)
{
+ // Build a list of id's and flags
+ String[] processIDs = new String[fetchList.size()];
+ String[] processIDHashes = new String[fetchList.size()];
+ String[] versions = new String[fetchList.size()];
+ boolean[] scanOnly = new boolean[fetchList.size()];
+
+ for (int i = 0; i < fetchList.size(); i++)
+ {
+ DocumentToProcess dToP = fetchList.get(i);
+ DocumentDescription dd = dToP.getDocument().getDocumentDescription();
+ processIDs[i] = dd.getDocumentIdentifier();
+ processIDHashes[i] = dd.getDocumentIdentifierHash();
+ versions[i] = versionMap.get(dd.getDocumentIdentifierHash());
+ scanOnly[i] = dToP.getScanOnly();
+ }
+
if (Thread.currentThread().isInterrupted())
throw new ManifoldCFException("Interrupted",ManifoldCFException.INTERRUPTED);
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Worker thread about to process "+Integer.toString(processIDs.length)+" documents");
- connector.processDocuments(processIDs,versions,activity,job.getSpecification(),scanOnly,jobType);
+ // Now, process in bulk
+ try
+ {
- // Flush remaining references into the database!
- activity.flush();
+ connector.processDocuments(processIDs,versions,activity,job.getSpecification(),scanOnly,jobType);
- // "Finish" the documents (removing unneeded carrydown info, etc.)
- DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,processIDHashes,job.getHopcountMode());
+ // Flush remaining references into the database!
+ activity.flush();
- ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
+ // "Finish" the documents (removing unneeded carrydown info, etc.)
+ DocumentDescription[] requeueCandidates = jobManager.finishDocuments(job.getID(),legalLinkTypes,processIDHashes,job.getHopcountMode());
- if (Logging.threads.isDebugEnabled())
- Logging.threads.debug("Worker thread done processing "+Integer.toString(processIDs.length)+" documents");
+ ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
- }
- catch (ServiceInterruption e)
- {
- // This service interruption could have resulted
- // after some or all of the documents ingested.
- // They will therefore need to go into the PENDINGPURGATORY
- // state.
-
- Logging.jobs.warn("Service interruption reported for job "+
- job.getID()+" connection '"+job.getConnectionName()+"': "+
- e.getMessage());
-
- // Mark the current documents to be recrawled in the
- // time specified, except for the ones beyond their limits.
- // Those will either be deleted, or an exception will be thrown that
- // will abort the current job.
-
- ingesterDeleteList.clear();
- ingesterDeleteListUnhashed.clear();
- jobmanagerDeleteList.clear();
- ArrayList requeueList = new ArrayList();
+ if (Logging.threads.isDebugEnabled())
+ Logging.threads.debug("Worker thread done processing "+Integer.toString(processIDs.length)+" documents");
- i = 0;
- while (i < finishList.size())
+ }
+ catch (ServiceInterruption e)
{
- QueuedDocument qd = finishList.get(i++);
- DocumentDescription dd = qd.getDocumentDescription();
- if (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
- dd.getFailRetryCount() == 0)
+ // This service interruption could have resulted
+ // after some or all of the documents ingested.
+ // They will therefore need to go into the PENDINGPURGATORY
+ // state.
+
+ Logging.jobs.warn("Service interruption reported for job "+
+ job.getID()+" connection '"+job.getConnectionName()+"': "+
+ e.getMessage());
+
+ // Mark the current documents to be recrawled in the
+ // time specified, except for the ones beyond their limits.
+ // Those will either be deleted, or an exception will be thrown that
+ // will abort the current job.
+
+ deleteList.clear();
+ ArrayList requeueList = new ArrayList();
+
+ for (int i = 0; i < finishList.size(); i++)
{
- // Treat this as a hard failure.
- if (e.isAbortOnFail())
- throw new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
- // We want this particular document to be not included in the
- // reprocessing. Therefore, we do the same thing as we would
- // if we got back a null version.
- DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
- String documentIDHash = dd.getDocumentIdentifierHash();
- // See if we need to delete
- if (oldDocStatus != null)
+ QueuedDocument qd = finishList.get(i);
+ DocumentDescription dd = qd.getDocumentDescription();
+ if (dd.getFailTime() != -1L && dd.getFailTime() < e.getRetryTime() ||
+ dd.getFailRetryCount() == 0)
{
- // Queue up to issue deletion
- ingesterDeleteList.add(documentIDHash);
- ingesterDeleteListUnhashed.add(dd.getDocumentIdentifier());
+ // Treat this as a hard failure.
+ if (e.isAbortOnFail())
+ {
+ abortOnFail = new ManifoldCFException("Repeated service interruptions - failure processing document"+((e.getCause()!=null)?": "+e.getCause().getMessage():""),e.getCause());
+ rescanList.add(qd);
+ }
+ // We want this particular document to be not included in the
+ // reprocessing. Therefore, we do the same thing as we would
+ // if we got back a null version.
+ deleteList.add(qd);
+ }
+ else
+ {
+ requeueList.add(qd);
}
- jobmanagerDeleteList.add(qd);
- }
- else
- {
- requeueList.add(qd);
}
- }
-
- // Requeue the documents we've identified
- requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
- e.getFailRetryCount());
-
- // Process the deletions too.
- processDeleteLists(outputName,connector,connection,jobManager,
- jobmanagerDeleteList,ingester,ingesterDeleteList,ingesterDeleteListUnhashed,
- job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
- continue;
- }
- }
+ // Requeue the documents we've identified
+ requeueDocuments(jobManager,requeueList,e.getRetryTime(),e.getFailTime(),
+ e.getFailRetryCount());
- // Note the documents that have been checked but not reingested. This should happen BEFORE we need
- // the statistics (which are calculated during the finishlist step below)
- if (ingesterCheckList.size() > 0)
- {
- String[] checkClasses = new String[ingesterCheckList.size()];
- String[] checkIDs = new String[ingesterCheckList.size()];
- i = 0;
- while (i < checkIDs.length)
- {
- checkClasses[i] = connectionName;
- checkIDs[i] = ingesterCheckList.get(i);
- i++;
- }
- ingester.documentCheckMultiple(outputName,checkClasses,checkIDs,currentTime);
- }
+ }
+ } // End of fetching
- // Go through the finish list and either mark completed, or requeue, depending on the kind of job this is.
- if (finishList.size() > 0)
- {
- // In both job types, we have to go through the finishList to figure out what to do with the documents.
- // In the case of a document that was aborted, we must requeue it for immediate reprocessing in BOTH job types.
- switch (job.getType())
+ if (finishList.size() > 0)
{
- case IJobDescription.TYPE_CONTINUOUS:
+ // In both job types, we have to go through the finishList to figure out what to do with the documents.
+ // In the case of a document that was aborted, we must requeue it for immediate reprocessing in BOTH job types.
+ switch (job.getType())
{
- // We need to populate timeArray
- String[] timeIDClasses = new String[finishList.size()];
- String[] timeIDHashes = new String[finishList.size()];
- i = 0;
- while (i < timeIDHashes.length)
- {
- QueuedDocument qd = (QueuedDocument)finishList.get(i);
- DocumentDescription dd = qd.getDocumentDescription();
- String documentIDHash = dd.getDocumentIdentifierHash();
- timeIDClasses[i] = connectionName;
- timeIDHashes[i] = documentIDHash;
- i++;
- }
- long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(outputName,timeIDClasses,timeIDHashes);
- Long[] recheckTimeArray = new Long[timeArray.length];
- int[] actionArray = new int[timeArray.length];
- DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
- i = 0;
- while (i < finishList.size())
+ case IJobDescription.TYPE_CONTINUOUS:
{
- QueuedDocument qd = finishList.get(i);
- recrawlDocs[i] = qd.getDocumentDescription();
- String documentID = recrawlDocs[i].getDocumentIdentifier();
-
- // If aborted due to sequencing issue, then requeue for reprocessing immediately, ignoring everything else.
- boolean wasAborted = abortSet.get(documentID) != null;
- if (wasAborted)
+ // We need to populate timeArray
+ String[] timeIDClasses = new String[finishList.size()];
+ String[] timeIDHashes = new String[finishList.size()];
+ for (int i = 0; i < timeIDHashes.length; i++)
{
- // Requeue for immediate reprocessing
- if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED as soon as prerequisites are met");
-
- actionArray[i] = IJobManager.ACTION_RESCAN;
- recheckTimeArray[i] = new Long(0L); // Must not use null; that means 'never'.
+ QueuedDocument qd = (QueuedDocument)finishList.get(i);
+ DocumentDescription dd = qd.getDocumentDescription();
+ String documentIDHash = dd.getDocumentIdentifierHash();
+ timeIDClasses[i] = connectionName;
+ timeIDHashes[i] = documentIDHash;
}
- else
+ long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(outputName,timeIDClasses,timeIDHashes);
+ Long[] recheckTimeArray = new Long[timeArray.length];
+ int[] actionArray = new int[timeArray.length];
+ DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
+ for (int i = 0; i < finishList.size(); i++)
{
- // Calculate the next time to run, or time to expire.
-
- // For run time, the formula is to calculate the running avg interval between changes,
- // add an additional interval (which comes from the job description),
- // and add that to the current time.
- // One caveat: we really want to calculate the interval from the last
- // time change was detected, but this is not implemented yet.
- long timeAmt = timeArray[i];
- // null value indicates never to schedule
-
- Long recrawlTime = activity.calculateDocumentRescheduleTime(currentTime,timeAmt,documentID);
- Long expireTime = activity.calculateDocumentExpireTime(currentTime,documentID);
-
-
- // Merge the two times together. We decide on the action based on the action with the lowest time.
- if (expireTime == null || (recrawlTime != null && recrawlTime.longValue() < expireTime.longValue()))
+ QueuedDocument qd = finishList.get(i);
+ recrawlDocs[i] = qd.getDocumentDescription();
+ String documentID = recrawlDocs[i].getDocumentIdentifier();
+
+ // If aborted due to sequencing issue, then requeue for reprocessing immediately, ignoring everything else.
+ boolean wasAborted = abortSet.get(documentID) != null;
+ if (wasAborted)
{
+ // Requeue for immediate reprocessing
if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
- recheckTimeArray[i] = recrawlTime;
+ Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED as soon as prerequisites are met");
+
actionArray[i] = IJobManager.ACTION_RESCAN;
- }
- else if (recrawlTime == null || (expireTime != null && recrawlTime.longValue() > expireTime.longValue()))
- {
- if (Logging.scheduling.isDebugEnabled())
- Logging.scheduling.debug("Document '"+documentID+"' will be REMOVED at "+expireTime.toString());
- recheckTimeArray[i] = expireTime;
- actionArray[i] = IJobManager.ACTION_REMOVE;
+ recheckTimeArray[i] = new Long(0L); // Must not use null; that means 'never'.
}
else
{
- // Default activity if conflict will be rescan
- if (Logging.scheduling.isDebugEnabled() && recrawlTime != null)
- Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
- recheckTimeArray[i] = recrawlTime;
- actionArray[i] = IJobManager.ACTION_RESCAN;
+ // Calculate the next time to run, or time to expire.
+
+ // For run time, the formula is to calculate the running avg interval between changes,
+ // add an additional interval (which comes from the job description),
+ // and add that to the current time.
+ // One caveat: we really want to calculate the interval from the last
+ // time change was detected, but this is not implemented yet.
+ long timeAmt = timeArray[i];
+ // null value indicates never to schedule
+
+ Long recrawlTime = activity.calculateDocumentRescheduleTime(currentTime,timeAmt,documentID);
+ Long expireTime = activity.calculateDocumentExpireTime(currentTime,documentID);
+
+
+ // Merge the two times together. We decide on the action based on the action with the lowest time.
+ if (expireTime == null || (recrawlTime != null && recrawlTime.longValue() < expireTime.longValue()))
+ {
+ if (Logging.scheduling.isDebugEnabled())
+ Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
+ recheckTimeArray[i] = recrawlTime;
+ actionArray[i] = IJobManager.ACTION_RESCAN;
+ }
+ else if (recrawlTime == null || (expireTime != null && recrawlTime.longValue() > expireTime.longValue()))
+ {
+ if (Logging.scheduling.isDebugEnabled())
+ Logging.scheduling.debug("Document '"+documentID+"' will be REMOVED at "+expireTime.toString());
+ recheckTimeArray[i] = expireTime;
+ actionArray[i] = IJobManager.ACTION_REMOVE;
+ }
+ else
+ {
+ // Default activity if conflict will be rescan
+ if (Logging.scheduling.isDebugEnabled() && recrawlTime != null)
+ Logging.scheduling.debug("Document '"+documentID+"' will be RESCANNED at "+recrawlTime.toString());
+ recheckTimeArray[i] = recrawlTime;
+ actionArray[i] = IJobManager.ACTION_RESCAN;
+ }
}
}
- i++;
- }
- jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);
+ jobManager.requeueDocumentMultiple(recrawlDocs,recheckTimeArray,actionArray);
- }
- break;
- case IJobDescription.TYPE_SPECIFIED:
- {
- // Separate the ones we actually finished from the ones we need to requeue because they were aborted
- List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
- List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
- i = 0;
- while (i < finishList.size())
+ }
+ break;
+ case IJobDescription.TYPE_SPECIFIED:
{
- QueuedDocument qd = finishList.get(i++);
- DocumentDescription dd = qd.getDocumentDescription();
- if (abortSet.get(dd.getDocumentIdentifier()) != null)
- {
- // The document was aborted, so put it into the abortedList
- abortedList.add(dd);
- }
- else
+ // Separate the ones we actually finished from the ones we need to requeue because they were aborted
+ List<DocumentDescription> completedList = new ArrayList<DocumentDescription>();
+ List<DocumentDescription> abortedList = new ArrayList<DocumentDescription>();
+ for (int i = 0; i < finishList.size(); i++)
{
- // The document was completed.
- completedList.add(dd);
+ QueuedDocument qd = finishList.get(i);
+ DocumentDescription dd = qd.getDocumentDescription();
+ if (abortSet.get(dd.getDocumentIdentifier()) != null)
+ {
+ // The document was aborted, so put it into the abortedList
+ abortedList.add(dd);
+ }
+ else
+ {
+ // The document was completed.
+ completedList.add(dd);
+ }
}
- }
- // Requeue the ones that must be repeated
- if (abortedList.size() > 0)
- {
- DocumentDescription[] docDescriptions = new DocumentDescription[abortedList.size()];
- Long[] recheckTimeArray = new Long[docDescriptions.length];
- int[] actionArray = new int[docDescriptions.length];
- i = 0;
- while (i < docDescriptions.length)
+ // Requeue the ones that must be repeated
+ if (abortedList.size() > 0)
{
- docDescriptions[i] = abortedList.get(i);
- recheckTimeArray[i] = new Long(0L);
- actionArray[i] = IJobManager.ACTION_RESCAN;
- i++;
- }
+ DocumentDescription[] docDescriptions = new DocumentDescription[abortedList.size()];
+ Long[] recheckTimeArray = new Long[docDescriptions.length];
+ int[] actionArray = new int[docDescriptions.length];
+ for (int i = 0; i < docDescriptions.length; i++)
+ {
+ docDescriptions[i] = abortedList.get(i);
+ recheckTimeArray[i] = new Long(0L);
+ actionArray[i] = IJobManager.ACTION_RESCAN;
+ }
- jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
- }
+ jobManager.requeueDocumentMultiple(docDescriptions,recheckTimeArray,actionArray);
+ }
- // Mark the ones completed that were actually completed.
- if (completedList.size() > 0)
- {
- DocumentDescription[] docDescriptions = new DocumentDescription[completedList.size()];
- i = 0;
- while (i < docDescriptions.length)
+ // Mark the ones completed that were actually completed.
+ if (completedList.size() > 0)
{
- docDescriptions[i] = (DocumentDescription)completedList.get(i);
- i++;
- }
+ DocumentDescription[] docDescriptions = new DocumentDescription[completedList.size()];
+ for (int i = 0; i < docDescriptions.length; i++)
+ {
+ docDescriptions[i] = (DocumentDescription)completedList.get(i);
+ }
- jobManager.markDocumentCompletedMultiple(docDescriptions);
+ jobManager.markDocumentCompletedMultiple(docDescriptions);
+ }
}
+ break;
+ default:
+ throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'");
}
- break;
- default:
- throw new ManifoldCFException("Unexpected value for job type: '"+Integer.toString(job.getType())+"'");
- }
- // Finally, if we're still alive, mark everything as "processed".
- i = 0;
- while (i < finishList.size())
- {
- QueuedDocument qd = finishList.get(i++);
- qd.setProcessed();
- }
+ // Finally, if we're still alive, mark everything as "processed".
+ for (int i = 0; i < finishList.size(); i++)
+ {
+ QueuedDocument qd = finishList.get(i);
+ qd.setProcessed();
+ }
+ }
+
+ }
+ finally
+ {
+ // Make sure we don't leave any dangling carrydown files
+ activity.discard();
}
+
+ // Successful processing of the set
+ // We count 'get version' time in the average, so even if we decide not to process a doc
+ // it still counts.
+ queueTracker.noteConnectionPerformance(qds.getCount(),connectionName,System.currentTimeMillis() - processingStartTime);
+
}
finally
{
- // Make sure we don't leave any dangling carrydown files
- activity.discard();
+ // Release any document temporary storage held by the connector
+ connector.releaseDocumentVersions(currentDocIDArray,newVersionStringArray);
}
-
- // Successful processing of the set
- // We count 'get version' time in the average, so even if we decide not to process a doc
- // it still counts.
- queueTracker.noteConnectionPerformance(qds.getCount(),connectionName,System.currentTimeMillis() - processingStartTime);
-
- }
- finally
- {
- // Release any document temporary storage held by the connector
- connector.releaseDocumentVersions(docIDArray,currentVersions);
+
}
}
- finally
- {
- RepositoryConnectorFactory.release(connector);
- }
+
+ // Now, handle the delete list
+ processDeleteLists(outputName,connector,connection,jobManager,
+ deleteList,ingester,
+ job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
+
+ // Handle hopcount removal
+ processHopcountRemovalLists(outputName,connector,connection,jobManager,
+ hopcountremoveList,ingester,
+ job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),queueTracker,currentTime);
+
+ }
+ finally
+ {
+ RepositoryConnectorFactory.release(connector);
}
+
}
+
+ // Handle rescanning
+ for (int i = 0; i < rescanList.size(); i++)
+ {
+ QueuedDocument qd = rescanList.get(i);
+ jobManager.resetDocument(qd.getDocumentDescription(),0L,IJobManager.ACTION_RESCAN,-1L,-1);
+ qd.setProcessed();
+ }
+
}
finally
{
// Note termination of processing of these documents in the overlap calculator
qds.endProcessing(queueTracker);
}
+
+ if (abortOnFail != null)
+ throw abortOnFail;
+
}
catch (ManifoldCFException e)
{
@@ -949,34 +922,89 @@ public class WorkerThread extends Thread
return true;
}
+ protected static void moveList(List<QueuedDocument> sourceList, List<QueuedDocument> targetList)
+ {
+ for (int i = 0; i < sourceList.size(); i++)
+ {
+ targetList.add(sourceList.get(i));
+ }
+ sourceList.clear();
+ }
+
+ /** Mark specified documents as 'hopcount removed', and remove them from the
+ * index. Documents in this state are presumed to have:
+ * (a) nothing in the index
+ * (b) no intrinsic links for which they are the origin
+ * In order to guarantee this situation, this method must be capable of doing much
+ * of what the deletion method must do. Specifically, it should be capable of deleting
+ * documents from the index should they be already present.
+ */
+ protected static void processHopcountRemovalLists(String outputName, IRepositoryConnector connector,
+ IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> hopcountremoveList,
+ IIncrementalIngester ingester,
+ Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
+ int hopcountMethod, QueueTracker queueTracker, long currentTime)
+ throws ManifoldCFException
+ {
+ // Remove from index
+ hopcountremoveList = removeFromIndex(outputName,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
+ // Mark as 'hopcountremoved' in the job queue
+ processJobQueueHopcountRemovals(hopcountremoveList,connector,connection,
+ jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+ }
+
/** Clear specified documents out of the job queue and from the appliance.
*@param outputName is the output connection name.
*@param jobManager is the job manager.
- *@param jobmanagerDeleteList is a list of QueuedDocument objects to clean out.
+ *@param deleteList is a list of QueuedDocument objects to clean out.
*@param ingester is the handle to the incremental ingestion API control object.
*@param ingesterDeleteList is a list of document id's to delete.
*/
protected static void processDeleteLists(String outputName, IRepositoryConnector connector,
- IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> jobmanagerDeleteList,
- IIncrementalIngester ingester, List<String> ingesterDeleteList, List<String> ingesterDeleteListUnhashed,
+ IRepositoryConnection connection, IJobManager jobManager, List<QueuedDocument> deleteList,
+ IIncrementalIngester ingester,
Long jobID, String[] legalLinkTypes, OutputActivity ingestLogger,
int hopcountMethod, QueueTracker queueTracker, long currentTime)
throws ManifoldCFException
{
- String connectionName = connection.getName();
+ // Remove from index
+ deleteList = removeFromIndex(outputName,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
+ // Delete from the job queue
+ processJobQueueDeletions(deleteList,connector,connection,
+ jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+ }
+ /** Remove a specified set of documents from the index.
+ *@return the list of documents whose state needs to be updated in jobqueue.
+ */
+ protected static List<QueuedDocument> removeFromIndex(String outputName,
+ String connectionName, IJobManager jobManager, List<QueuedDocument> deleteList,
+ IIncrementalIngester ingester, OutputActivity ingestLogger)
+ throws ManifoldCFException
+ {
+ List<String> ingesterDeleteList = new ArrayList<String>(deleteList.size());
+ for (int i = 0; i < deleteList.size(); i++)
+ {
+ QueuedDocument qd = deleteList.get(i);
+ DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
+ // See if we need to delete from index
+ if (oldDocStatus != null)
+ {
+ // Queue up to issue deletion
+ ingesterDeleteList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
+ }
+ }
+
// First, do the ingester delete list. This guarantees that if the ingestion system is down, this operation will be handled atomically.
if (ingesterDeleteList.size() > 0)
{
String[] deleteClasses = new String[ingesterDeleteList.size()];
String[] deleteIDs = new String[ingesterDeleteList.size()];
- int i = 0;
- while (i < ingesterDeleteList.size())
+ for (int i = 0; i < ingesterDeleteList.size(); i++)
{
deleteClasses[i] = connectionName;
deleteIDs[i] = ingesterDeleteList.get(i);
- i++;
}
// Try to delete the documents via the output connection.
@@ -988,24 +1016,24 @@ public class WorkerThread extends Thread
{
// It looks like the output connection is not currently functioning, so we need to requeue instead of deleting
// those documents that could not be removed.
- List<QueuedDocument> newJobmanagerDeleteList = new ArrayList<QueuedDocument>();
+ List<QueuedDocument> newDeleteList = new ArrayList<QueuedDocument>();
List<QueuedDocument> newRequeueList = new ArrayList<QueuedDocument>();
- Map<String,String> ingesterMap = new HashMap<String,String>();
+ Set<String> ingesterSet = new HashSet<String>();
for (int j = 0 ; j < ingesterDeleteList.size() ; j++)
{
String id = ingesterDeleteList.get(j);
- ingesterMap.put(id,id);
+ ingesterSet.add(id);
}
- for (int j = 0 ; j < jobmanagerDeleteList.size() ; j++)
+ for (int j = 0 ; j < deleteList.size() ; j++)
{
- QueuedDocument qd = jobmanagerDeleteList.get(j);
+ QueuedDocument qd = deleteList.get(j);
DocumentDescription dd = qd.getDocumentDescription();
- String documentIdentifier = dd.getDocumentIdentifier();
- if (ingesterMap.get(documentIdentifier) != null)
+ String documentIdentifierHash = dd.getDocumentIdentifierHash();
+ if (ingesterSet.contains(documentIdentifierHash))
newRequeueList.add(qd);
else
- newJobmanagerDeleteList.add(qd);
+ newDeleteList.add(qd);
}
// Requeue those that are supposed to be requeued
@@ -1013,13 +1041,10 @@ public class WorkerThread extends Thread
e.getFailRetryCount());
// Process the ones that are just new job queue changes
- jobmanagerDeleteList = newJobmanagerDeleteList;
+ deleteList = newDeleteList;
}
}
-
- // Delete from the job queue
- processJobQueueDeletions(jobmanagerDeleteList,connector,connection,
- jobManager,jobID,legalLinkTypes,hopcountMethod,queueTracker,currentTime);
+ return deleteList;
}
/** Process job queue deletions. Either the indexer has already been updated, or it is not necessary to update it.
@@ -1033,12 +1058,10 @@ public class WorkerThread extends Thread
if (jobmanagerDeleteList.size() > 0)
{
DocumentDescription[] deleteDescriptions = new DocumentDescription[jobmanagerDeleteList.size()];
- int i = 0;
- while (i < deleteDescriptions.length)
+ for (int i = 0; i < deleteDescriptions.length; i++)
{
QueuedDocument qd = jobmanagerDeleteList.get(i);
deleteDescriptions[i] = qd.getDocumentDescription();
- i++;
}
// Do the actual work.
@@ -1048,10 +1071,41 @@ public class WorkerThread extends Thread
ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
// Mark all these as done
- i = 0;
- while (i < jobmanagerDeleteList.size())
+ for (int i = 0; i < jobmanagerDeleteList.size(); i++)
+ {
+ QueuedDocument qd = jobmanagerDeleteList.get(i);
+ qd.setProcessed();
+ }
+ }
+ }
+
+ /** Process job queue hopcount removals. All indexer updates have already taken place.
+ */
+ protected static void processJobQueueHopcountRemovals(List<QueuedDocument> jobmanagerRemovalList,
+ IRepositoryConnector connector, IRepositoryConnection connection, IJobManager jobManager,
+ Long jobID, String[] legalLinkTypes, int hopcountMethod, QueueTracker queueTracker, long currentTime)
+ throws ManifoldCFException
+ {
+ // Now, do the document queue cleanup for deletions.
+ if (jobmanagerRemovalList.size() > 0)
+ {
+ DocumentDescription[] removalDescriptions = new DocumentDescription[jobmanagerRemovalList.size()];
+ for (int i = 0; i < removalDescriptions.length; i++)
+ {
+ QueuedDocument qd = jobmanagerRemovalList.get(i);
+ removalDescriptions[i] = qd.getDocumentDescription();
+ }
+
+ // Do the actual work.
+ DocumentDescription[] requeueCandidates = jobManager.markDocumentHopcountRemovalMultiple(jobID,legalLinkTypes,removalDescriptions,hopcountMethod);
+
+ // Requeue those documents that had carrydown data modifications
+ ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,connector,connection,queueTracker,currentTime);
+
+ // Mark all these as done
+ for (int i = 0; i < jobmanagerRemovalList.size(); i++)
{
- QueuedDocument qd = jobmanagerDeleteList.get(i++);
+ QueuedDocument qd = jobmanagerRemovalList.get(i);
qd.setProcessed();
}
}
Modified: manifoldcf/branches/CONNECTORS-501/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/MockWebService.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-501/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/MockWebService.java?rev=1372143&r1=1372142&r2=1372143&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-501/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/MockWebService.java (original)
+++ manifoldcf/branches/CONNECTORS-501/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/MockWebService.java Sun Aug 12 16:26:04 2012
@@ -138,7 +138,7 @@ public class MockWebService
// Temporary: Prevent links to children deeper than a certain level; this is to help
// the debug process
- if (theLevel < 2)
+ if (theLevel < 9)
{
// Generate links to direct children
for (int i = 0; i < docsPerLevel; i++)