You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/12/10 14:42:00 UTC

svn commit: r1644404 [2/2] - in /manifoldcf/branches/dev_1x: ./ framework/ framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ framework/agents/src/main/...

Modified: manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java?rev=1644404&r1=1644403&r2=1644404&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java (original)
+++ manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java Wed Dec 10 13:42:00 2014
@@ -38,24 +38,140 @@ public class PipelineSpecificationWithVe
     this.componentIDHash = componentIDHash;
   }
   
-  /** Get pipeline specification.
-  *@return the pipeline specification.
+  protected DocumentIngestStatus getStatus(int index)
+  {
+    DocumentIngestStatusSet set = queuedDocument.getLastIngestedStatus(pipelineSpecification.getStageConnectionName(pipelineSpecification.getOutputStage(index)));
+    if (set == null)
+      return null;
+    return set.getComponent(componentIDHash);
+  }
+
+  /** Get a count of all stages.
+  *@return the total count of all stages.
+  */
+  @Override
+  public int getStageCount()
+  {
+    return pipelineSpecification.getStageCount();
+  }
+  
+  /** Find children of a given pipeline stage.  Pass -1 to find the children of the root stage.
+  *@param stage is the stage index to get the children of.
+  *@return the pipeline stages that represent those children.
+  */
+  @Override
+  public int[] getStageChildren(int stage)
+  {
+    return pipelineSpecification.getStageChildren(stage);
+  }
+  
+  /** Find parent of a given pipeline stage.  Returns -1 if there's no parent (it's the root).
+  *@param stage is the stage index to get the parent of.
+  *@return the pipeline stage that is the parent, or -1.
   */
   @Override
-  public IPipelineSpecification getPipelineSpecification()
+  public int getStageParent(int stage)
   {
-    return pipelineSpecification;
+    return pipelineSpecification.getStageParent(stage);
   }
 
-  protected DocumentIngestStatus getStatus(int index)
+  /** Get the connection name for a pipeline stage.
+  *@param stage is the stage to get the connection name for.
+  *@return the connection name for that stage.
+  */
+  @Override
+  public String getStageConnectionName(int stage)
   {
-    IPipelineSpecificationBasic basic = pipelineSpecification.getBasicPipelineSpecification();
-    DocumentIngestStatusSet set = queuedDocument.getLastIngestedStatus(basic.getStageConnectionName(basic.getOutputStage(index)));
-    if (set == null)
-      return null;
-    return set.getComponent(componentIDHash);
+    return pipelineSpecification.getStageConnectionName(stage);
+  }
+
+  /** Check if a stage is an output stage.
+  *@param stage is the stage to check.
+  *@return true if the stage represents an output connection.
+  */
+  @Override
+  public boolean checkStageOutputConnection(int stage)
+  {
+    return pipelineSpecification.checkStageOutputConnection(stage);
+  }
+  
+  /** Return the number of output connections.
+  *@return the total number of output connections in this specification.
+  */
+  @Override
+  public int getOutputCount()
+  {
+    return pipelineSpecification.getOutputCount();
   }
   
+  /** Given an output index, return the stage number for that output.
+  *@param index is the output connection index.
+  *@return the stage number.
+  */
+  @Override
+  public int getOutputStage(int index)
+  {
+    return pipelineSpecification.getOutputStage(index);
+  }
+
+  /** Get the transformation connection names mentioned by the IPipelineSpecification
+  * object. */
+  @Override
+  public String[] getTransformationConnectionNames()
+  {
+    return pipelineSpecification.getTransformationConnectionNames();
+  }
+  
+  /** Get the transformation connection instances mentioned by the IPipelineSpecification
+  * object. */
+  @Override
+  public ITransformationConnection[] getTransformationConnections()
+  {
+    return pipelineSpecification.getTransformationConnections();
+  }
+  
+  /** Get the output connection names mentioned by the IPipelineSpecification
+  * object. */
+  @Override
+  public String[] getOutputConnectionNames()
+  {
+    return pipelineSpecification.getOutputConnectionNames();
+  }
+  
+  /** Get the output connection instances mentioned by the IPipelineSpecification
+  * object. */
+  @Override
+  public IOutputConnection[] getOutputConnections()
+  {
+    return pipelineSpecification.getOutputConnections();
+  }
+  
+  /** Get the index of the transformation connection corresponding to a
+  * specific pipeline stage. */
+  @Override
+  public Integer getTransformationConnectionIndex(int stage)
+  {
+    return pipelineSpecification.getTransformationConnectionIndex(stage);
+  }
+  
+  /** Get the index of the output connection corresponding to a
+  * specific pipeline stage. */
+  @Override
+  public Integer getOutputConnectionIndex(int stage)
+  {
+    return pipelineSpecification.getOutputConnectionIndex(stage);
+  }
+
+  /** Get the description string for a pipeline stage.
+  *@param stage is the stage to get the connection name for.
+  *@return the description string that stage.
+  */
+  @Override
+  public VersionContext getStageDescriptionString(int stage)
+  {
+    return pipelineSpecification.getStageDescriptionString(stage);
+  }
+
   /** For a given output index, return a document version string.
   *@param index is the output index.
   *@return the document version string.

Modified: manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1644404&r1=1644403&r2=1644404&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Wed Dec 10 13:42:00 2014
@@ -143,8 +143,9 @@ public class WorkerThread extends Thread
 
             // Build a basic pipeline specification right off; we need it whenever
             // we interact with Incremental Ingester.
-            IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
-            String lastIndexedOutputConnectionName = ingester.getLastIndexedOutputConnectionName(pipelineSpecificationBasic);
+            IPipelineConnections pipelineConnections = new PipelineConnections(new PipelineSpecificationBasic(job),transformationConnectionManager,outputConnectionManager);
+            
+            String lastIndexedOutputConnectionName = ingester.getLastIndexedOutputConnectionName(pipelineConnections);
             // Compute a parameter version string for all documents in this job
             String newParameterVersion = packParameters(job.getForcedMetadata());
 
@@ -291,7 +292,7 @@ public class WorkerThread extends Thread
                   IPipelineSpecification pipelineSpecification;
                   try
                   {
-                    pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester);
+                    pipelineSpecification = new PipelineSpecification(pipelineConnections,job,ingester);
                   }
                   catch (ServiceInterruption e)
                   {
@@ -360,7 +361,7 @@ public class WorkerThread extends Thread
                     
                     ProcessActivity activity = new ProcessActivity(job.getID(),processID,
                       rt,jobManager,ingester,
-                      connectionName,pipelineSpecification,transformationConnectionManager,outputConnectionManager,
+                      connectionName,pipelineSpecification,
                       previousDocuments,
                       currentTime,
                       job.getExpiration(),
@@ -392,7 +393,7 @@ public class WorkerThread extends Thread
                             String documentIdentifierHash = qd.getDocumentDescription().getDocumentIdentifierHash();
                             // In order to be able to loop over all the components that the incremental ingester knows about, we need to know
                             // what the FIRST output is.
-                            DocumentIngestStatusSet set = qd.getLastIngestedStatus(ingester.getFirstIndexedOutputConnectionName(pipelineSpecificationBasic));
+                            DocumentIngestStatusSet set = qd.getLastIngestedStatus(ingester.getFirstIndexedOutputConnectionName(pipelineConnections));
                             if (set != null)
                             {
                               Iterator<String> componentHashes = set.componentIterator();
@@ -405,7 +406,7 @@ public class WorkerThread extends Thread
                                 {
                                   // This component must be removed.
                                   ingester.documentRemove(
-                                    pipelineSpecificationBasic,
+                                    pipelineConnections,
                                     connectionName,documentIdentifierHash,componentHash,
                                     ingestLogger);
                                 }
@@ -514,7 +515,7 @@ public class WorkerThread extends Thread
                         }
                         // This method should exercise reasonable intelligence.  If the document has never been indexed, it should detect that
                         // and stop.  Otherwise, it should update the statistics accordingly.
-                        ingester.documentCheckMultiple(pipelineSpecificationBasic,checkClasses,checkIDs,currentTime);
+                        ingester.documentCheckMultiple(pipelineConnections,checkClasses,checkIDs,currentTime);
                       }
 
                       // Process the finish list!
@@ -549,7 +550,7 @@ public class WorkerThread extends Thread
                               timeIDClasses[i] = connectionName;
                               timeIDHashes[i] = documentIDHash;
                             }
-                            long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineSpecificationBasic,timeIDClasses,timeIDHashes);
+                            long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineConnections,timeIDClasses,timeIDHashes);
                             Long[] recheckTimeArray = new Long[timeArray.length];
                             int[] actionArray = new int[timeArray.length];
                             DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
@@ -690,12 +691,12 @@ public class WorkerThread extends Thread
                   }
                   
                   // Now, handle the delete list
-                  processDeleteLists(pipelineSpecificationBasic,connector,connection,jobManager,
+                  processDeleteLists(pipelineConnections,connector,connection,jobManager,
                     deleteList,ingester,
                     job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
                   // Handle hopcount removal
-                  processHopcountRemovalLists(pipelineSpecificationBasic,connector,connection,jobManager,
+                  processHopcountRemovalLists(pipelineConnections,connector,connection,jobManager,
                     hopcountremoveList,ingester,
                     job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
@@ -845,7 +846,7 @@ public class WorkerThread extends Thread
   * of what the deletion method must do.  Specifically, it should be capable of deleting
   * documents from the index should they be already present.
   */
-  protected static void processHopcountRemovalLists(IPipelineSpecificationBasic pipelineSpecificationBasic,
+  protected static void processHopcountRemovalLists(IPipelineConnections pipelineConnections,
     IRepositoryConnector connector,
     IRepositoryConnection connection, IJobManager jobManager,
     List<QueuedDocument> hopcountremoveList,
@@ -855,20 +856,20 @@ public class WorkerThread extends Thread
     throws ManifoldCFException
   {
     // Remove from index
-    hopcountremoveList = removeFromIndex(pipelineSpecificationBasic,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
+    hopcountremoveList = removeFromIndex(pipelineConnections,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
     // Mark as 'hopcountremoved' in the job queue
     processJobQueueHopcountRemovals(hopcountremoveList,connector,connection,
       jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
   }
 
   /** Clear specified documents out of the job queue and from the appliance.
-  *@param pipelineSpecificationBasic is the basic pipeline specification for this job.
+  *@param pipelineConnections is the basic pipeline specification for this job.
   *@param jobManager is the job manager.
   *@param deleteList is a list of QueuedDocument objects to clean out.
   *@param ingester is the handle to the incremental ingestion API control object.
   *@param ingesterDeleteList is a list of document id's to delete.
   */
-  protected static void processDeleteLists(IPipelineSpecificationBasic pipelineSpecificationBasic,
+  protected static void processDeleteLists(IPipelineConnections pipelineConnections,
     IRepositoryConnector connector,
     IRepositoryConnection connection, IJobManager jobManager,
     List<QueuedDocument> deleteList,
@@ -878,7 +879,7 @@ public class WorkerThread extends Thread
     throws ManifoldCFException
   {
     // Remove from index
-    deleteList = removeFromIndex(pipelineSpecificationBasic,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
+    deleteList = removeFromIndex(pipelineConnections,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
     // Delete from the job queue
     processJobQueueDeletions(deleteList,connector,connection,
       jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
@@ -887,7 +888,7 @@ public class WorkerThread extends Thread
   /** Remove a specified set of documents from the index.
   *@return the list of documents whose state needs to be updated in jobqueue.
   */
-  protected static List<QueuedDocument> removeFromIndex(IPipelineSpecificationBasic pipelineSpecificationBasic,
+  protected static List<QueuedDocument> removeFromIndex(IPipelineConnections pipelineConnections,
     String connectionName, IJobManager jobManager, List<QueuedDocument> deleteList, 
     IIncrementalIngester ingester, OutputActivity ingestLogger)
     throws ManifoldCFException
@@ -919,7 +920,7 @@ public class WorkerThread extends Thread
       // Try to delete the documents via the output connection.
       try
       {
-        ingester.documentDeleteMultiple(pipelineSpecificationBasic,deleteClasses,deleteIDs,ingestLogger);
+        ingester.documentDeleteMultiple(pipelineConnections,deleteClasses,deleteIDs,ingestLogger);
       }
       catch (ServiceInterruption e)
       {
@@ -1110,181 +1111,6 @@ public class WorkerThread extends Thread
 
   // Nested classes
 
-  /** Pipeline connections implementation.
-  */
-  protected static class PipelineConnections implements IPipelineConnections
-  {
-    protected final IPipelineSpecification spec;
-    protected final String[] transformationConnectionNames;
-    protected final ITransformationConnection[] transformationConnections;
-    protected final String[] outputConnectionNames;
-    protected final IOutputConnection[] outputConnections;
-    // We need a way to get from stage index to connection index.
-    // These arrays are looked up by stage index, and return the appropriate connection index.
-    protected final Map<Integer,Integer> transformationConnectionLookupMap = new HashMap<Integer,Integer>();
-    protected final Map<Integer,Integer> outputConnectionLookupMap = new HashMap<Integer,Integer>();
-    
-    public PipelineConnections(ITransformationConnectionManager transformationConnectionManager,
-      IOutputConnectionManager outputConnectionManager, IPipelineSpecification spec)
-      throws ManifoldCFException
-    {
-      this.spec = spec;
-      IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
-      // Now, load all the connections we'll ever need, being sure to only load one copy of each.
-      // We first segregate them into unique transformation and output connections.
-      int count = basicSpec.getStageCount();
-      Set<String> transformations = new HashSet<String>();
-      Set<String> outputs = new HashSet<String>();
-      for (int i = 0; i < count; i++)
-      {
-        if (basicSpec.checkStageOutputConnection(i))
-          outputs.add(basicSpec.getStageConnectionName(i));
-        else
-          transformations.add(basicSpec.getStageConnectionName(i));
-      }
-      
-      Map<String,Integer> transformationNameMap = new HashMap<String,Integer>();
-      Map<String,Integer> outputNameMap = new HashMap<String,Integer>();
-      transformationConnectionNames = new String[transformations.size()];
-      outputConnectionNames = new String[outputs.size()];
-      int index = 0;
-      for (String connectionName : transformations)
-      {
-        transformationConnectionNames[index] = connectionName;
-        transformationNameMap.put(connectionName,new Integer(index++));
-      }
-      index = 0;
-      for (String connectionName : outputs)
-      {
-        outputConnectionNames[index] = connectionName;
-        outputNameMap.put(connectionName,new Integer(index++));
-      }
-      // Load!
-      transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
-      outputConnections = outputConnectionManager.loadMultiple(outputConnectionNames);
-      
-      for (int i = 0; i < count; i++)
-      {
-        Integer k;
-        if (basicSpec.checkStageOutputConnection(i))
-        {
-          outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(basicSpec.getStageConnectionName(i)));
-        }
-        else
-        {
-          transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(basicSpec.getStageConnectionName(i)));
-        }
-      }
-    }
-    
-    @Override
-    public IPipelineSpecification getSpecification()
-    {
-      return spec;
-    }
-    
-    @Override
-    public String[] getTransformationConnectionNames()
-    {
-      return transformationConnectionNames;
-    }
-    
-    @Override
-    public ITransformationConnection[] getTransformationConnections()
-    {
-      return transformationConnections;
-    }
-    
-    @Override
-    public String[] getOutputConnectionNames()
-    {
-      return outputConnectionNames;
-    }
-    
-    @Override
-    public IOutputConnection[] getOutputConnections()
-    {
-      return outputConnections;
-    }
-    
-    @Override
-    public Integer getTransformationConnectionIndex(int stage)
-    {
-      return transformationConnectionLookupMap.get(new Integer(stage));
-    }
-    
-    @Override
-    public Integer getOutputConnectionIndex(int stage)
-    {
-      return outputConnectionLookupMap.get(new Integer(stage));
-    }
-    
-  }
-
-  /** IPipelineConnectionsWithVersions implementation.
-  */
-  protected static class PipelineConnectionsWithVersions implements IPipelineConnectionsWithVersions
-  {
-    protected final IPipelineConnections pipelineConnections;
-    protected final IPipelineSpecificationWithVersions pipelineSpecificationWithVersions;
-    
-    public PipelineConnectionsWithVersions(IPipelineConnections pipelineConnections, IPipelineSpecificationWithVersions pipelineSpecificationWithVersions)
-      throws ManifoldCFException
-    {
-      this.pipelineConnections = pipelineConnections;
-      this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
-    }
-    
-    @Override
-    public IPipelineSpecification getSpecification()
-    {
-      return pipelineConnections.getSpecification();
-    }
-    
-    @Override
-    public String[] getTransformationConnectionNames()
-    {
-      return pipelineConnections.getTransformationConnectionNames();
-    }
-    
-    @Override
-    public ITransformationConnection[] getTransformationConnections()
-    {
-      return pipelineConnections.getTransformationConnections();
-    }
-    
-    @Override
-    public String[] getOutputConnectionNames()
-    {
-      return pipelineConnections.getOutputConnectionNames();
-    }
-    
-    @Override
-    public IOutputConnection[] getOutputConnections()
-    {
-      return pipelineConnections.getOutputConnections();
-    }
-    
-    @Override
-    public Integer getTransformationConnectionIndex(int stage)
-    {
-      return pipelineConnections.getTransformationConnectionIndex(stage);
-    }
-    
-    @Override
-    public Integer getOutputConnectionIndex(int stage)
-    {
-      return pipelineConnections.getOutputConnectionIndex(stage);
-    }
-
-    @Override
-    public IPipelineSpecificationWithVersions getSpecificationWithVersions()
-    {
-      return pipelineSpecificationWithVersions;
-    }
-    
-  }
-
   /** Process activity class wraps access to the ingester and job queue.
   */
   protected static class ProcessActivity implements IProcessActivity
@@ -1296,8 +1122,6 @@ public class WorkerThread extends Thread
     protected final IIncrementalIngester ingester;
     protected final String connectionName;
     protected final IPipelineSpecification pipelineSpecification;
-    protected final ITransformationConnectionManager transformationConnectionManager;
-    protected final IOutputConnectionManager outputConnectionManager;
     protected final Map<String,QueuedDocument> previousDocuments;
     protected final long currentTime;
     protected final Long expireInterval;
@@ -1313,9 +1137,6 @@ public class WorkerThread extends Thread
     protected final IReprioritizationTracker rt;
     protected final String parameterVersion;
 
-    protected IPipelineConnections pipelineConnections = null;
-    protected IPipelineConnectionsWithVersions pipelineConnectionsWithVersions = null;
-    
     // We submit references in bulk, because that's way more efficient.
     protected final Map<DocumentReference,DocumentReference> referenceList = new HashMap<DocumentReference,DocumentReference>();
 
@@ -1345,14 +1166,6 @@ public class WorkerThread extends Thread
     // This represents primary documents.
     protected final Set<String> touchedPrimarySet = new HashSet<String>();
     
-    protected IPipelineConnections getPipelineConnections()
-      throws ManifoldCFException
-    {
-      if (pipelineConnections == null)
-        pipelineConnections = new PipelineConnections(transformationConnectionManager,outputConnectionManager,pipelineSpecification);
-      return pipelineConnections;
-    }
-    
     /** Constructor.
     *@param jobManager is the job manager
     *@param ingester is the ingester
@@ -1361,7 +1174,7 @@ public class WorkerThread extends Thread
       IReprioritizationTracker rt, IJobManager jobManager,
       IIncrementalIngester ingester,
       String connectionName,
-      IPipelineSpecification pipelineSpecification, ITransformationConnectionManager transformationConnectionManager, IOutputConnectionManager outputConnectionManager,
+      IPipelineSpecification pipelineSpecification,
       Map<String,QueuedDocument> previousDocuments,
       long currentTime,
       Long expireInterval,
@@ -1380,8 +1193,6 @@ public class WorkerThread extends Thread
       this.ingester = ingester;
       this.connectionName = connectionName;
       this.pipelineSpecification = pipelineSpecification;
-      this.transformationConnectionManager = transformationConnectionManager;
-      this.outputConnectionManager = outputConnectionManager;
       this.previousDocuments = previousDocuments;
       this.currentTime = currentTime;
       this.expireInterval = expireInterval;
@@ -1473,7 +1284,7 @@ public class WorkerThread extends Thread
     {
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
       String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
-      IPipelineSpecificationWithVersions spec = computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier);
+      IPipelineSpecificationWithVersions spec = computePipelineSpecificationWithVersions(documentIdentifierHash,componentIdentifierHash,documentIdentifier);
       return ingester.checkFetchDocument(spec,newVersionString,parameterVersion,connection.getACLAuthority());
     }
 
@@ -1698,7 +1509,7 @@ public class WorkerThread extends Thread
       String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
       checkMultipleDispositions(documentIdentifier,componentIdentifier,componentIdentifierHash);
       ingester.documentRecord(
-        pipelineSpecification.getBasicPipelineSpecification(),
+        pipelineSpecification,
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,currentTime);
       touchedSet.add(documentIdentifier);
@@ -1797,7 +1608,7 @@ public class WorkerThread extends Thread
       // indicates that it should always be refetched.  But I have no way to describe this situation
       // in the database at the moment.
       ingester.documentIngest(
-        new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
+        computePipelineSpecificationWithVersions(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,parameterVersion,
         connection.getACLAuthority(),
@@ -1840,7 +1651,7 @@ public class WorkerThread extends Thread
       checkMultipleDispositions(documentIdentifier,componentIdentifier,componentIdentifierHash);
 
       ingester.documentNoData(
-        new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
+        computePipelineSpecificationWithVersions(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,parameterVersion,
         connection.getACLAuthority(),
@@ -1865,7 +1676,7 @@ public class WorkerThread extends Thread
 
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
       ingester.documentRemove(
-        pipelineSpecification.getBasicPipelineSpecification(),
+        pipelineSpecification,
         connectionName,documentIdentifierHash,null,
         ingestLogger);
         
@@ -2244,7 +2055,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkDateIndexable(
-        getPipelineConnections(),date,
+        pipelineSpecification,date,
         ingestLogger);
     }
 
@@ -2257,7 +2068,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkMimeTypeIndexable(
-        getPipelineConnections(),mimeType,
+        pipelineSpecification,mimeType,
         ingestLogger);
     }
 
@@ -2270,7 +2081,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkDocumentIndexable(
-        getPipelineConnections(),localFile,
+        pipelineSpecification,localFile,
         ingestLogger);
     }
 
@@ -2283,7 +2094,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkLengthIndexable(
-        getPipelineConnections(),length,
+        pipelineSpecification,length,
         ingestLogger);
     }
 
@@ -2297,7 +2108,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkURLIndexable(
-        getPipelineConnections(),url,
+        pipelineSpecification,url,
         ingestLogger);
     }
 
@@ -2385,7 +2196,7 @@ public class WorkerThread extends Thread
       components.add(componentIdentifierHash);
     }
     
-    protected IPipelineSpecificationWithVersions computePipelineSpecification(String documentIdentifierHash,
+    protected IPipelineSpecificationWithVersions computePipelineSpecificationWithVersions(String documentIdentifierHash,
       String componentIdentifierHash,
       String documentIdentifier)
     {