You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/12/10 13:43:31 UTC
svn commit: r1644386 [2/2] - in
/manifoldcf/branches/CONNECTORS-1118/framework:
agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/
agents/src/main/java/org/apache/manifoldcf/agents/interfaces/
agents/src/main/java/org/apache/manifoldc...
Modified: manifoldcf/branches/CONNECTORS-1118/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1118/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1644386&r1=1644385&r2=1644386&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1118/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-1118/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Wed Dec 10 12:43:31 2014
@@ -143,8 +143,9 @@ public class WorkerThread extends Thread
// Build a basic pipeline specification right off; we need it whenever
// we interact with Incremental Ingester.
- IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
- String lastIndexedOutputConnectionName = ingester.getLastIndexedOutputConnectionName(pipelineSpecificationBasic);
+ IPipelineConnections pipelineConnections = new PipelineConnections(new PipelineSpecificationBasic(job),transformationConnectionManager,outputConnectionManager);
+
+ String lastIndexedOutputConnectionName = ingester.getLastIndexedOutputConnectionName(pipelineConnections);
// Universal job data we'll need later
String connectionName = job.getConnectionName();
@@ -289,7 +290,7 @@ public class WorkerThread extends Thread
IPipelineSpecification pipelineSpecification;
try
{
- pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester);
+ pipelineSpecification = new PipelineSpecification(pipelineConnections,job,ingester);
}
catch (ServiceInterruption e)
{
@@ -358,7 +359,7 @@ public class WorkerThread extends Thread
ProcessActivity activity = new ProcessActivity(job.getID(),processID,
rt,jobManager,ingester,
- connectionName,pipelineSpecification,transformationConnectionManager,outputConnectionManager,
+ connectionName,pipelineSpecification,
previousDocuments,
currentTime,
job.getExpiration(),
@@ -388,7 +389,7 @@ public class WorkerThread extends Thread
String documentIdentifierHash = qd.getDocumentDescription().getDocumentIdentifierHash();
// In order to be able to loop over all the components that the incremental ingester knows about, we need to know
// what the FIRST output is.
- DocumentIngestStatusSet set = qd.getLastIngestedStatus(ingester.getFirstIndexedOutputConnectionName(pipelineSpecificationBasic));
+ DocumentIngestStatusSet set = qd.getLastIngestedStatus(ingester.getFirstIndexedOutputConnectionName(pipelineConnections));
if (set != null)
{
Iterator<String> componentHashes = set.componentIterator();
@@ -401,7 +402,7 @@ public class WorkerThread extends Thread
{
// This component must be removed.
ingester.documentRemove(
- pipelineSpecificationBasic,
+ pipelineConnections,
connectionName,documentIdentifierHash,componentHash,
ingestLogger);
}
@@ -510,7 +511,7 @@ public class WorkerThread extends Thread
}
// This method should exercise reasonable intelligence. If the document has never been indexed, it should detect that
// and stop. Otherwise, it should update the statistics accordingly.
- ingester.documentCheckMultiple(pipelineSpecificationBasic,checkClasses,checkIDs,currentTime);
+ ingester.documentCheckMultiple(pipelineConnections,checkClasses,checkIDs,currentTime);
}
// Process the finish list!
@@ -545,7 +546,7 @@ public class WorkerThread extends Thread
timeIDClasses[i] = connectionName;
timeIDHashes[i] = documentIDHash;
}
- long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineSpecificationBasic,timeIDClasses,timeIDHashes);
+ long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineConnections,timeIDClasses,timeIDHashes);
Long[] recheckTimeArray = new Long[timeArray.length];
int[] actionArray = new int[timeArray.length];
DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
@@ -686,12 +687,12 @@ public class WorkerThread extends Thread
}
// Now, handle the delete list
- processDeleteLists(pipelineSpecificationBasic,connector,connection,jobManager,
+ processDeleteLists(pipelineConnections,connector,connection,jobManager,
deleteList,ingester,
job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
// Handle hopcount removal
- processHopcountRemovalLists(pipelineSpecificationBasic,connector,connection,jobManager,
+ processHopcountRemovalLists(pipelineConnections,connector,connection,jobManager,
hopcountremoveList,ingester,
job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
@@ -841,7 +842,7 @@ public class WorkerThread extends Thread
* of what the deletion method must do. Specifically, it should be capable of deleting
* documents from the index should they be already present.
*/
- protected static void processHopcountRemovalLists(IPipelineSpecificationBasic pipelineSpecificationBasic,
+ protected static void processHopcountRemovalLists(IPipelineConnections pipelineConnections,
IRepositoryConnector connector,
IRepositoryConnection connection, IJobManager jobManager,
List<QueuedDocument> hopcountremoveList,
@@ -851,20 +852,20 @@ public class WorkerThread extends Thread
throws ManifoldCFException
{
// Remove from index
- hopcountremoveList = removeFromIndex(pipelineSpecificationBasic,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
+ hopcountremoveList = removeFromIndex(pipelineConnections,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
// Mark as 'hopcountremoved' in the job queue
processJobQueueHopcountRemovals(hopcountremoveList,connector,connection,
jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
}
/** Clear specified documents out of the job queue and from the appliance.
- *@param pipelineSpecificationBasic is the basic pipeline specification for this job.
+ *@param pipelineConnections is the basic pipeline specification for this job.
*@param jobManager is the job manager.
*@param deleteList is a list of QueuedDocument objects to clean out.
*@param ingester is the handle to the incremental ingestion API control object.
*@param ingesterDeleteList is a list of document id's to delete.
*/
- protected static void processDeleteLists(IPipelineSpecificationBasic pipelineSpecificationBasic,
+ protected static void processDeleteLists(IPipelineConnections pipelineConnections,
IRepositoryConnector connector,
IRepositoryConnection connection, IJobManager jobManager,
List<QueuedDocument> deleteList,
@@ -874,7 +875,7 @@ public class WorkerThread extends Thread
throws ManifoldCFException
{
// Remove from index
- deleteList = removeFromIndex(pipelineSpecificationBasic,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
+ deleteList = removeFromIndex(pipelineConnections,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
// Delete from the job queue
processJobQueueDeletions(deleteList,connector,connection,
jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
@@ -883,7 +884,7 @@ public class WorkerThread extends Thread
/** Remove a specified set of documents from the index.
*@return the list of documents whose state needs to be updated in jobqueue.
*/
- protected static List<QueuedDocument> removeFromIndex(IPipelineSpecificationBasic pipelineSpecificationBasic,
+ protected static List<QueuedDocument> removeFromIndex(IPipelineConnections pipelineConnections,
String connectionName, IJobManager jobManager, List<QueuedDocument> deleteList,
IIncrementalIngester ingester, OutputActivity ingestLogger)
throws ManifoldCFException
@@ -915,7 +916,7 @@ public class WorkerThread extends Thread
// Try to delete the documents via the output connection.
try
{
- ingester.documentDeleteMultiple(pipelineSpecificationBasic,deleteClasses,deleteIDs,ingestLogger);
+ ingester.documentDeleteMultiple(pipelineConnections,deleteClasses,deleteIDs,ingestLogger);
}
catch (ServiceInterruption e)
{
@@ -1052,181 +1053,6 @@ public class WorkerThread extends Thread
// Nested classes
- /** Pipeline connections implementation.
- */
- protected static class PipelineConnections implements IPipelineConnections
- {
- protected final IPipelineSpecification spec;
- protected final String[] transformationConnectionNames;
- protected final ITransformationConnection[] transformationConnections;
- protected final String[] outputConnectionNames;
- protected final IOutputConnection[] outputConnections;
- // We need a way to get from stage index to connection index.
- // These arrays are looked up by stage index, and return the appropriate connection index.
- protected final Map<Integer,Integer> transformationConnectionLookupMap = new HashMap<Integer,Integer>();
- protected final Map<Integer,Integer> outputConnectionLookupMap = new HashMap<Integer,Integer>();
-
- public PipelineConnections(ITransformationConnectionManager transformationConnectionManager,
- IOutputConnectionManager outputConnectionManager, IPipelineSpecification spec)
- throws ManifoldCFException
- {
- this.spec = spec;
- IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
- // Now, load all the connections we'll ever need, being sure to only load one copy of each.
- // We first segregate them into unique transformation and output connections.
- int count = basicSpec.getStageCount();
- Set<String> transformations = new HashSet<String>();
- Set<String> outputs = new HashSet<String>();
- for (int i = 0; i < count; i++)
- {
- if (basicSpec.checkStageOutputConnection(i))
- outputs.add(basicSpec.getStageConnectionName(i));
- else
- transformations.add(basicSpec.getStageConnectionName(i));
- }
-
- Map<String,Integer> transformationNameMap = new HashMap<String,Integer>();
- Map<String,Integer> outputNameMap = new HashMap<String,Integer>();
- transformationConnectionNames = new String[transformations.size()];
- outputConnectionNames = new String[outputs.size()];
- int index = 0;
- for (String connectionName : transformations)
- {
- transformationConnectionNames[index] = connectionName;
- transformationNameMap.put(connectionName,new Integer(index++));
- }
- index = 0;
- for (String connectionName : outputs)
- {
- outputConnectionNames[index] = connectionName;
- outputNameMap.put(connectionName,new Integer(index++));
- }
- // Load!
- transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
- outputConnections = outputConnectionManager.loadMultiple(outputConnectionNames);
-
- for (int i = 0; i < count; i++)
- {
- Integer k;
- if (basicSpec.checkStageOutputConnection(i))
- {
- outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(basicSpec.getStageConnectionName(i)));
- }
- else
- {
- transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(basicSpec.getStageConnectionName(i)));
- }
- }
- }
-
- @Override
- public IPipelineSpecification getSpecification()
- {
- return spec;
- }
-
- @Override
- public String[] getTransformationConnectionNames()
- {
- return transformationConnectionNames;
- }
-
- @Override
- public ITransformationConnection[] getTransformationConnections()
- {
- return transformationConnections;
- }
-
- @Override
- public String[] getOutputConnectionNames()
- {
- return outputConnectionNames;
- }
-
- @Override
- public IOutputConnection[] getOutputConnections()
- {
- return outputConnections;
- }
-
- @Override
- public Integer getTransformationConnectionIndex(int stage)
- {
- return transformationConnectionLookupMap.get(new Integer(stage));
- }
-
- @Override
- public Integer getOutputConnectionIndex(int stage)
- {
- return outputConnectionLookupMap.get(new Integer(stage));
- }
-
- }
-
- /** IPipelineConnectionsWithVersions implementation.
- */
- protected static class PipelineConnectionsWithVersions implements IPipelineConnectionsWithVersions
- {
- protected final IPipelineConnections pipelineConnections;
- protected final IPipelineSpecificationWithVersions pipelineSpecificationWithVersions;
-
- public PipelineConnectionsWithVersions(IPipelineConnections pipelineConnections, IPipelineSpecificationWithVersions pipelineSpecificationWithVersions)
- throws ManifoldCFException
- {
- this.pipelineConnections = pipelineConnections;
- this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
- }
-
- @Override
- public IPipelineSpecification getSpecification()
- {
- return pipelineConnections.getSpecification();
- }
-
- @Override
- public String[] getTransformationConnectionNames()
- {
- return pipelineConnections.getTransformationConnectionNames();
- }
-
- @Override
- public ITransformationConnection[] getTransformationConnections()
- {
- return pipelineConnections.getTransformationConnections();
- }
-
- @Override
- public String[] getOutputConnectionNames()
- {
- return pipelineConnections.getOutputConnectionNames();
- }
-
- @Override
- public IOutputConnection[] getOutputConnections()
- {
- return pipelineConnections.getOutputConnections();
- }
-
- @Override
- public Integer getTransformationConnectionIndex(int stage)
- {
- return pipelineConnections.getTransformationConnectionIndex(stage);
- }
-
- @Override
- public Integer getOutputConnectionIndex(int stage)
- {
- return pipelineConnections.getOutputConnectionIndex(stage);
- }
-
- @Override
- public IPipelineSpecificationWithVersions getSpecificationWithVersions()
- {
- return pipelineSpecificationWithVersions;
- }
-
- }
-
/** Process activity class wraps access to the ingester and job queue.
*/
protected static class ProcessActivity implements IProcessActivity
@@ -1238,8 +1064,6 @@ public class WorkerThread extends Thread
protected final IIncrementalIngester ingester;
protected final String connectionName;
protected final IPipelineSpecification pipelineSpecification;
- protected final ITransformationConnectionManager transformationConnectionManager;
- protected final IOutputConnectionManager outputConnectionManager;
protected final Map<String,QueuedDocument> previousDocuments;
protected final long currentTime;
protected final Long expireInterval;
@@ -1253,9 +1077,6 @@ public class WorkerThread extends Thread
protected final OutputActivity ingestLogger;
protected final IReprioritizationTracker rt;
- protected IPipelineConnections pipelineConnections = null;
- protected IPipelineConnectionsWithVersions pipelineConnectionsWithVersions = null;
-
// We submit references in bulk, because that's way more efficient.
protected final Map<DocumentReference,DocumentReference> referenceList = new HashMap<DocumentReference,DocumentReference>();
@@ -1285,14 +1106,6 @@ public class WorkerThread extends Thread
// This represents primary documents.
protected final Set<String> touchedPrimarySet = new HashSet<String>();
- protected IPipelineConnections getPipelineConnections()
- throws ManifoldCFException
- {
- if (pipelineConnections == null)
- pipelineConnections = new PipelineConnections(transformationConnectionManager,outputConnectionManager,pipelineSpecification);
- return pipelineConnections;
- }
-
/** Constructor.
*@param jobManager is the job manager
*@param ingester is the ingester
@@ -1301,7 +1114,7 @@ public class WorkerThread extends Thread
IReprioritizationTracker rt, IJobManager jobManager,
IIncrementalIngester ingester,
String connectionName,
- IPipelineSpecification pipelineSpecification, ITransformationConnectionManager transformationConnectionManager, IOutputConnectionManager outputConnectionManager,
+ IPipelineSpecification pipelineSpecification,
Map<String,QueuedDocument> previousDocuments,
long currentTime,
Long expireInterval,
@@ -1318,8 +1131,6 @@ public class WorkerThread extends Thread
this.ingester = ingester;
this.connectionName = connectionName;
this.pipelineSpecification = pipelineSpecification;
- this.transformationConnectionManager = transformationConnectionManager;
- this.outputConnectionManager = outputConnectionManager;
this.previousDocuments = previousDocuments;
this.currentTime = currentTime;
this.expireInterval = expireInterval;
@@ -1409,7 +1220,7 @@ public class WorkerThread extends Thread
{
String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
- IPipelineSpecificationWithVersions spec = computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier);
+ IPipelineSpecificationWithVersions spec = computePipelineSpecificationWithVersions(documentIdentifierHash,componentIdentifierHash,documentIdentifier);
return ingester.checkFetchDocument(spec,newVersionString,connection.getACLAuthority());
}
@@ -1634,7 +1445,7 @@ public class WorkerThread extends Thread
String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
checkMultipleDispositions(documentIdentifier,componentIdentifier,componentIdentifierHash);
ingester.documentRecord(
- pipelineSpecification.getBasicPipelineSpecification(),
+ pipelineSpecification,
connectionName,documentIdentifierHash,componentIdentifierHash,
version,currentTime);
touchedSet.add(documentIdentifier);
@@ -1690,7 +1501,7 @@ public class WorkerThread extends Thread
// indicates that it should always be refetched. But I have no way to describe this situation
// in the database at the moment.
ingester.documentIngest(
- new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
+ computePipelineSpecificationWithVersions(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
connectionName,documentIdentifierHash,componentIdentifierHash,
version,
connection.getACLAuthority(),
@@ -1733,7 +1544,7 @@ public class WorkerThread extends Thread
checkMultipleDispositions(documentIdentifier,componentIdentifier,componentIdentifierHash);
ingester.documentNoData(
- new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
+ computePipelineSpecificationWithVersions(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
connectionName,documentIdentifierHash,componentIdentifierHash,
version,
connection.getACLAuthority(),
@@ -1758,7 +1569,7 @@ public class WorkerThread extends Thread
String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
ingester.documentRemove(
- pipelineSpecification.getBasicPipelineSpecification(),
+ pipelineSpecification,
connectionName,documentIdentifierHash,null,
ingestLogger);
@@ -2123,7 +1934,7 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkDateIndexable(
- getPipelineConnections(),date,
+ pipelineSpecification,date,
ingestLogger);
}
@@ -2136,7 +1947,7 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkMimeTypeIndexable(
- getPipelineConnections(),mimeType,
+ pipelineSpecification,mimeType,
ingestLogger);
}
@@ -2149,7 +1960,7 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkDocumentIndexable(
- getPipelineConnections(),localFile,
+ pipelineSpecification,localFile,
ingestLogger);
}
@@ -2162,7 +1973,7 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkLengthIndexable(
- getPipelineConnections(),length,
+ pipelineSpecification,length,
ingestLogger);
}
@@ -2176,7 +1987,7 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkURLIndexable(
- getPipelineConnections(),url,
+ pipelineSpecification,url,
ingestLogger);
}
@@ -2264,7 +2075,7 @@ public class WorkerThread extends Thread
components.add(componentIdentifierHash);
}
- protected IPipelineSpecificationWithVersions computePipelineSpecification(String documentIdentifierHash,
+ protected IPipelineSpecificationWithVersions computePipelineSpecificationWithVersions(String documentIdentifierHash,
String componentIdentifierHash,
String documentIdentifier)
{