You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/12/10 14:36:43 UTC

svn commit: r1644399 [2/2] - in /manifoldcf/trunk: ./ framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ framework/agents/src/main/java/org/apache/manif...

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Wed Dec 10 13:36:42 2014
@@ -143,8 +143,9 @@ public class WorkerThread extends Thread
 
             // Build a basic pipeline specification right off; we need it whenever
             // we interact with Incremental Ingester.
-            IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
-            String lastIndexedOutputConnectionName = ingester.getLastIndexedOutputConnectionName(pipelineSpecificationBasic);
+            IPipelineConnections pipelineConnections = new PipelineConnections(new PipelineSpecificationBasic(job),transformationConnectionManager,outputConnectionManager);
+            
+            String lastIndexedOutputConnectionName = ingester.getLastIndexedOutputConnectionName(pipelineConnections);
 
             // Universal job data we'll need later
             String connectionName = job.getConnectionName();
@@ -289,7 +290,7 @@ public class WorkerThread extends Thread
                   IPipelineSpecification pipelineSpecification;
                   try
                   {
-                    pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester);
+                    pipelineSpecification = new PipelineSpecification(pipelineConnections,job,ingester);
                   }
                   catch (ServiceInterruption e)
                   {
@@ -358,7 +359,7 @@ public class WorkerThread extends Thread
                     
                     ProcessActivity activity = new ProcessActivity(job.getID(),processID,
                       rt,jobManager,ingester,
-                      connectionName,pipelineSpecification,transformationConnectionManager,outputConnectionManager,
+                      connectionName,pipelineSpecification,
                       previousDocuments,
                       currentTime,
                       job.getExpiration(),
@@ -388,7 +389,7 @@ public class WorkerThread extends Thread
                             String documentIdentifierHash = qd.getDocumentDescription().getDocumentIdentifierHash();
                             // In order to be able to loop over all the components that the incremental ingester knows about, we need to know
                             // what the FIRST output is.
-                            DocumentIngestStatusSet set = qd.getLastIngestedStatus(ingester.getFirstIndexedOutputConnectionName(pipelineSpecificationBasic));
+                            DocumentIngestStatusSet set = qd.getLastIngestedStatus(ingester.getFirstIndexedOutputConnectionName(pipelineConnections));
                             if (set != null)
                             {
                               Iterator<String> componentHashes = set.componentIterator();
@@ -401,7 +402,7 @@ public class WorkerThread extends Thread
                                 {
                                   // This component must be removed.
                                   ingester.documentRemove(
-                                    pipelineSpecificationBasic,
+                                    pipelineConnections,
                                     connectionName,documentIdentifierHash,componentHash,
                                     ingestLogger);
                                 }
@@ -510,7 +511,7 @@ public class WorkerThread extends Thread
                         }
                         // This method should exercise reasonable intelligence.  If the document has never been indexed, it should detect that
                         // and stop.  Otherwise, it should update the statistics accordingly.
-                        ingester.documentCheckMultiple(pipelineSpecificationBasic,checkClasses,checkIDs,currentTime);
+                        ingester.documentCheckMultiple(pipelineConnections,checkClasses,checkIDs,currentTime);
                       }
 
                       // Process the finish list!
@@ -545,7 +546,7 @@ public class WorkerThread extends Thread
                               timeIDClasses[i] = connectionName;
                               timeIDHashes[i] = documentIDHash;
                             }
-                            long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineSpecificationBasic,timeIDClasses,timeIDHashes);
+                            long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineConnections,timeIDClasses,timeIDHashes);
                             Long[] recheckTimeArray = new Long[timeArray.length];
                             int[] actionArray = new int[timeArray.length];
                             DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
@@ -686,12 +687,12 @@ public class WorkerThread extends Thread
                   }
                   
                   // Now, handle the delete list
-                  processDeleteLists(pipelineSpecificationBasic,connector,connection,jobManager,
+                  processDeleteLists(pipelineConnections,connector,connection,jobManager,
                     deleteList,ingester,
                     job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
                   // Handle hopcount removal
-                  processHopcountRemovalLists(pipelineSpecificationBasic,connector,connection,jobManager,
+                  processHopcountRemovalLists(pipelineConnections,connector,connection,jobManager,
                     hopcountremoveList,ingester,
                     job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
@@ -841,7 +842,7 @@ public class WorkerThread extends Thread
   * of what the deletion method must do.  Specifically, it should be capable of deleting
   * documents from the index should they be already present.
   */
-  protected static void processHopcountRemovalLists(IPipelineSpecificationBasic pipelineSpecificationBasic,
+  protected static void processHopcountRemovalLists(IPipelineConnections pipelineConnections,
     IRepositoryConnector connector,
     IRepositoryConnection connection, IJobManager jobManager,
     List<QueuedDocument> hopcountremoveList,
@@ -851,20 +852,20 @@ public class WorkerThread extends Thread
     throws ManifoldCFException
   {
     // Remove from index
-    hopcountremoveList = removeFromIndex(pipelineSpecificationBasic,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
+    hopcountremoveList = removeFromIndex(pipelineConnections,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
     // Mark as 'hopcountremoved' in the job queue
     processJobQueueHopcountRemovals(hopcountremoveList,connector,connection,
       jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
   }
 
   /** Clear specified documents out of the job queue and from the appliance.
-  *@param pipelineSpecificationBasic is the basic pipeline specification for this job.
+  *@param pipelineConnections is the basic pipeline specification for this job.
   *@param jobManager is the job manager.
   *@param deleteList is a list of QueuedDocument objects to clean out.
   *@param ingester is the handle to the incremental ingestion API control object.
   *@param ingesterDeleteList is a list of document id's to delete.
   */
-  protected static void processDeleteLists(IPipelineSpecificationBasic pipelineSpecificationBasic,
+  protected static void processDeleteLists(IPipelineConnections pipelineConnections,
     IRepositoryConnector connector,
     IRepositoryConnection connection, IJobManager jobManager,
     List<QueuedDocument> deleteList,
@@ -874,7 +875,7 @@ public class WorkerThread extends Thread
     throws ManifoldCFException
   {
     // Remove from index
-    deleteList = removeFromIndex(pipelineSpecificationBasic,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
+    deleteList = removeFromIndex(pipelineConnections,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
     // Delete from the job queue
     processJobQueueDeletions(deleteList,connector,connection,
       jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
@@ -883,7 +884,7 @@ public class WorkerThread extends Thread
   /** Remove a specified set of documents from the index.
   *@return the list of documents whose state needs to be updated in jobqueue.
   */
-  protected static List<QueuedDocument> removeFromIndex(IPipelineSpecificationBasic pipelineSpecificationBasic,
+  protected static List<QueuedDocument> removeFromIndex(IPipelineConnections pipelineConnections,
     String connectionName, IJobManager jobManager, List<QueuedDocument> deleteList, 
     IIncrementalIngester ingester, OutputActivity ingestLogger)
     throws ManifoldCFException
@@ -915,7 +916,7 @@ public class WorkerThread extends Thread
       // Try to delete the documents via the output connection.
       try
       {
-        ingester.documentDeleteMultiple(pipelineSpecificationBasic,deleteClasses,deleteIDs,ingestLogger);
+        ingester.documentDeleteMultiple(pipelineConnections,deleteClasses,deleteIDs,ingestLogger);
       }
       catch (ServiceInterruption e)
       {
@@ -1052,181 +1053,6 @@ public class WorkerThread extends Thread
 
   // Nested classes
 
-  /** Pipeline connections implementation.
-  */
-  protected static class PipelineConnections implements IPipelineConnections
-  {
-    protected final IPipelineSpecification spec;
-    protected final String[] transformationConnectionNames;
-    protected final ITransformationConnection[] transformationConnections;
-    protected final String[] outputConnectionNames;
-    protected final IOutputConnection[] outputConnections;
-    // We need a way to get from stage index to connection index.
-    // These arrays are looked up by stage index, and return the appropriate connection index.
-    protected final Map<Integer,Integer> transformationConnectionLookupMap = new HashMap<Integer,Integer>();
-    protected final Map<Integer,Integer> outputConnectionLookupMap = new HashMap<Integer,Integer>();
-    
-    public PipelineConnections(ITransformationConnectionManager transformationConnectionManager,
-      IOutputConnectionManager outputConnectionManager, IPipelineSpecification spec)
-      throws ManifoldCFException
-    {
-      this.spec = spec;
-      IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
-      // Now, load all the connections we'll ever need, being sure to only load one copy of each.
-      // We first segregate them into unique transformation and output connections.
-      int count = basicSpec.getStageCount();
-      Set<String> transformations = new HashSet<String>();
-      Set<String> outputs = new HashSet<String>();
-      for (int i = 0; i < count; i++)
-      {
-        if (basicSpec.checkStageOutputConnection(i))
-          outputs.add(basicSpec.getStageConnectionName(i));
-        else
-          transformations.add(basicSpec.getStageConnectionName(i));
-      }
-      
-      Map<String,Integer> transformationNameMap = new HashMap<String,Integer>();
-      Map<String,Integer> outputNameMap = new HashMap<String,Integer>();
-      transformationConnectionNames = new String[transformations.size()];
-      outputConnectionNames = new String[outputs.size()];
-      int index = 0;
-      for (String connectionName : transformations)
-      {
-        transformationConnectionNames[index] = connectionName;
-        transformationNameMap.put(connectionName,new Integer(index++));
-      }
-      index = 0;
-      for (String connectionName : outputs)
-      {
-        outputConnectionNames[index] = connectionName;
-        outputNameMap.put(connectionName,new Integer(index++));
-      }
-      // Load!
-      transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
-      outputConnections = outputConnectionManager.loadMultiple(outputConnectionNames);
-      
-      for (int i = 0; i < count; i++)
-      {
-        Integer k;
-        if (basicSpec.checkStageOutputConnection(i))
-        {
-          outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(basicSpec.getStageConnectionName(i)));
-        }
-        else
-        {
-          transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(basicSpec.getStageConnectionName(i)));
-        }
-      }
-    }
-    
-    @Override
-    public IPipelineSpecification getSpecification()
-    {
-      return spec;
-    }
-    
-    @Override
-    public String[] getTransformationConnectionNames()
-    {
-      return transformationConnectionNames;
-    }
-    
-    @Override
-    public ITransformationConnection[] getTransformationConnections()
-    {
-      return transformationConnections;
-    }
-    
-    @Override
-    public String[] getOutputConnectionNames()
-    {
-      return outputConnectionNames;
-    }
-    
-    @Override
-    public IOutputConnection[] getOutputConnections()
-    {
-      return outputConnections;
-    }
-    
-    @Override
-    public Integer getTransformationConnectionIndex(int stage)
-    {
-      return transformationConnectionLookupMap.get(new Integer(stage));
-    }
-    
-    @Override
-    public Integer getOutputConnectionIndex(int stage)
-    {
-      return outputConnectionLookupMap.get(new Integer(stage));
-    }
-    
-  }
-
-  /** IPipelineConnectionsWithVersions implementation.
-  */
-  protected static class PipelineConnectionsWithVersions implements IPipelineConnectionsWithVersions
-  {
-    protected final IPipelineConnections pipelineConnections;
-    protected final IPipelineSpecificationWithVersions pipelineSpecificationWithVersions;
-    
-    public PipelineConnectionsWithVersions(IPipelineConnections pipelineConnections, IPipelineSpecificationWithVersions pipelineSpecificationWithVersions)
-      throws ManifoldCFException
-    {
-      this.pipelineConnections = pipelineConnections;
-      this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
-    }
-    
-    @Override
-    public IPipelineSpecification getSpecification()
-    {
-      return pipelineConnections.getSpecification();
-    }
-    
-    @Override
-    public String[] getTransformationConnectionNames()
-    {
-      return pipelineConnections.getTransformationConnectionNames();
-    }
-    
-    @Override
-    public ITransformationConnection[] getTransformationConnections()
-    {
-      return pipelineConnections.getTransformationConnections();
-    }
-    
-    @Override
-    public String[] getOutputConnectionNames()
-    {
-      return pipelineConnections.getOutputConnectionNames();
-    }
-    
-    @Override
-    public IOutputConnection[] getOutputConnections()
-    {
-      return pipelineConnections.getOutputConnections();
-    }
-    
-    @Override
-    public Integer getTransformationConnectionIndex(int stage)
-    {
-      return pipelineConnections.getTransformationConnectionIndex(stage);
-    }
-    
-    @Override
-    public Integer getOutputConnectionIndex(int stage)
-    {
-      return pipelineConnections.getOutputConnectionIndex(stage);
-    }
-
-    @Override
-    public IPipelineSpecificationWithVersions getSpecificationWithVersions()
-    {
-      return pipelineSpecificationWithVersions;
-    }
-    
-  }
-
   /** Process activity class wraps access to the ingester and job queue.
   */
   protected static class ProcessActivity implements IProcessActivity
@@ -1238,8 +1064,6 @@ public class WorkerThread extends Thread
     protected final IIncrementalIngester ingester;
     protected final String connectionName;
     protected final IPipelineSpecification pipelineSpecification;
-    protected final ITransformationConnectionManager transformationConnectionManager;
-    protected final IOutputConnectionManager outputConnectionManager;
     protected final Map<String,QueuedDocument> previousDocuments;
     protected final long currentTime;
     protected final Long expireInterval;
@@ -1253,9 +1077,6 @@ public class WorkerThread extends Thread
     protected final OutputActivity ingestLogger;
     protected final IReprioritizationTracker rt;
 
-    protected IPipelineConnections pipelineConnections = null;
-    protected IPipelineConnectionsWithVersions pipelineConnectionsWithVersions = null;
-    
     // We submit references in bulk, because that's way more efficient.
     protected final Map<DocumentReference,DocumentReference> referenceList = new HashMap<DocumentReference,DocumentReference>();
 
@@ -1285,14 +1106,6 @@ public class WorkerThread extends Thread
     // This represents primary documents.
     protected final Set<String> touchedPrimarySet = new HashSet<String>();
     
-    protected IPipelineConnections getPipelineConnections()
-      throws ManifoldCFException
-    {
-      if (pipelineConnections == null)
-        pipelineConnections = new PipelineConnections(transformationConnectionManager,outputConnectionManager,pipelineSpecification);
-      return pipelineConnections;
-    }
-    
     /** Constructor.
     *@param jobManager is the job manager
     *@param ingester is the ingester
@@ -1301,7 +1114,7 @@ public class WorkerThread extends Thread
       IReprioritizationTracker rt, IJobManager jobManager,
       IIncrementalIngester ingester,
       String connectionName,
-      IPipelineSpecification pipelineSpecification, ITransformationConnectionManager transformationConnectionManager, IOutputConnectionManager outputConnectionManager,
+      IPipelineSpecification pipelineSpecification,
       Map<String,QueuedDocument> previousDocuments,
       long currentTime,
       Long expireInterval,
@@ -1318,8 +1131,6 @@ public class WorkerThread extends Thread
       this.ingester = ingester;
       this.connectionName = connectionName;
       this.pipelineSpecification = pipelineSpecification;
-      this.transformationConnectionManager = transformationConnectionManager;
-      this.outputConnectionManager = outputConnectionManager;
       this.previousDocuments = previousDocuments;
       this.currentTime = currentTime;
       this.expireInterval = expireInterval;
@@ -1409,7 +1220,7 @@ public class WorkerThread extends Thread
     {
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
       String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
-      IPipelineSpecificationWithVersions spec = computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier);
+      IPipelineSpecificationWithVersions spec = computePipelineSpecificationWithVersions(documentIdentifierHash,componentIdentifierHash,documentIdentifier);
       return ingester.checkFetchDocument(spec,newVersionString,connection.getACLAuthority());
     }
 
@@ -1634,7 +1445,7 @@ public class WorkerThread extends Thread
       String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
       checkMultipleDispositions(documentIdentifier,componentIdentifier,componentIdentifierHash);
       ingester.documentRecord(
-        pipelineSpecification.getBasicPipelineSpecification(),
+        pipelineSpecification,
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,currentTime);
       touchedSet.add(documentIdentifier);
@@ -1690,7 +1501,7 @@ public class WorkerThread extends Thread
       // indicates that it should always be refetched.  But I have no way to describe this situation
       // in the database at the moment.
       ingester.documentIngest(
-        new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
+        computePipelineSpecificationWithVersions(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,
         connection.getACLAuthority(),
@@ -1733,7 +1544,7 @@ public class WorkerThread extends Thread
       checkMultipleDispositions(documentIdentifier,componentIdentifier,componentIdentifierHash);
 
       ingester.documentNoData(
-        new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
+        computePipelineSpecificationWithVersions(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,
         connection.getACLAuthority(),
@@ -1758,7 +1569,7 @@ public class WorkerThread extends Thread
 
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
       ingester.documentRemove(
-        pipelineSpecification.getBasicPipelineSpecification(),
+        pipelineSpecification,
         connectionName,documentIdentifierHash,null,
         ingestLogger);
         
@@ -2123,7 +1934,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkDateIndexable(
-        getPipelineConnections(),date,
+        pipelineSpecification,date,
         ingestLogger);
     }
 
@@ -2136,7 +1947,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkMimeTypeIndexable(
-        getPipelineConnections(),mimeType,
+        pipelineSpecification,mimeType,
         ingestLogger);
     }
 
@@ -2149,7 +1960,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkDocumentIndexable(
-        getPipelineConnections(),localFile,
+        pipelineSpecification,localFile,
         ingestLogger);
     }
 
@@ -2162,7 +1973,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkLengthIndexable(
-        getPipelineConnections(),length,
+        pipelineSpecification,length,
         ingestLogger);
     }
 
@@ -2176,7 +1987,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkURLIndexable(
-        getPipelineConnections(),url,
+        pipelineSpecification,url,
         ingestLogger);
     }
 
@@ -2264,7 +2075,7 @@ public class WorkerThread extends Thread
       components.add(componentIdentifierHash);
     }
     
-    protected IPipelineSpecificationWithVersions computePipelineSpecification(String documentIdentifierHash,
+    protected IPipelineSpecificationWithVersions computePipelineSpecificationWithVersions(String documentIdentifierHash,
       String componentIdentifierHash,
       String documentIdentifier)
     {