You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/12/09 20:38:30 UTC

svn commit: r1644162 - in /manifoldcf/branches/CONNECTORS-1118/framework: agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ pull-agent/src/main/java/org/apache/manifoldcf/...

Author: kwright
Date: Tue Dec  9 19:38:30 2014
New Revision: 1644162

URL: http://svn.apache.org/r1644162
Log:
Change IIncrementalIngester api to allow for minimal work setting up pipelines.

Modified:
    manifoldcf/branches/CONNECTORS-1118/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
    manifoldcf/branches/CONNECTORS-1118/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
    manifoldcf/branches/CONNECTORS-1118/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-1118/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1118/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1644162&r1=1644161&r2=1644162&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1118/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-1118/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Tue Dec  9 19:38:30 2014
@@ -228,20 +228,19 @@ public class IncrementalIngester extends
   }
 
   /** Check if a date is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the IPipelineConnections object for this pipeline.
   *@param date is the date to check.
   *@param activity are the activities available to this method.
   *@return true if the mimeType is indexable.
   */
   @Override
   public boolean checkDateIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     Date date,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineObject pipeline = pipelineGrab(
-      new PipelineConnections(pipelineSpecification));
+    PipelineObject pipeline = pipelineGrab(pipelineConnections);
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -256,20 +255,19 @@ public class IncrementalIngester extends
   }
 
   /** Check if a mime type is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param mimeType is the mime type to check.
   *@param activity are the activities available to this method.
   *@return true if the mimeType is indexable.
   */
   @Override
   public boolean checkMimeTypeIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     String mimeType,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineObject pipeline = pipelineGrab(
-      new PipelineConnections(pipelineSpecification));
+    PipelineObject pipeline = pipelineGrab(pipelineConnections);
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -284,20 +282,19 @@ public class IncrementalIngester extends
   }
 
   /** Check if a file is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param localFile is the local file to check.
   *@param activity are the activities available to this method.
   *@return true if the local file is indexable.
   */
   @Override
   public boolean checkDocumentIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     File localFile,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineObject pipeline = pipelineGrab(
-      new PipelineConnections(pipelineSpecification));
+    PipelineObject pipeline = pipelineGrab(pipelineConnections);
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -313,20 +310,19 @@ public class IncrementalIngester extends
 
   /** Pre-determine whether a document's length is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that are too long to be indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param length is the length of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   @Override
   public boolean checkLengthIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     long length,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineObject pipeline = pipelineGrab(
-      new PipelineConnections(pipelineSpecification));
+    PipelineObject pipeline = pipelineGrab(pipelineConnections);
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -342,20 +338,19 @@ public class IncrementalIngester extends
 
   /** Pre-determine whether a document's URL is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that not indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param url is the url of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   @Override
   public boolean checkURLIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     String url,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineObject pipeline = pipelineGrab(
-      new PipelineConnections(pipelineSpecification));
+    PipelineObject pipeline = pipelineGrab(pipelineConnections);
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -376,7 +371,7 @@ public class IncrementalIngester extends
   *@param outputDescriptionString - the output description string
   *@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
   */
-  protected PipelineObjectWithVersions pipelineGrabWithVersions(PipelineConnectionsWithVersions pipelineConnections)
+  protected PipelineObjectWithVersions pipelineGrabWithVersions(IPipelineConnectionsWithVersions pipelineConnections)
     throws ManifoldCFException
   {
     // Pick up all needed transformation connectors
@@ -426,7 +421,7 @@ public class IncrementalIngester extends
   *@param outputDescriptionString - the output description string
   *@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
   */
-  protected PipelineObject pipelineGrab(PipelineConnections pipelineConnections)
+  protected PipelineObject pipelineGrab(IPipelineConnections pipelineConnections)
     throws ManifoldCFException
   {
     // Pick up all needed transformation connectors
@@ -679,7 +674,7 @@ public class IncrementalIngester extends
   * This method is conceptually similar to documentIngest(), but does not actually take
   * a document or allow it to be transformed.  If there is a document already
   * indexed, it is removed from the index.
-  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+  *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
   *@param componentHash is the hashed component identifier, if any.
@@ -690,7 +685,7 @@ public class IncrementalIngester extends
   */
   @Override
   public void documentNoData(
-    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
     String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String authorityName,
@@ -698,13 +693,11 @@ public class IncrementalIngester extends
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
-    
     String docKey = makeKey(identifierClass,identifierHash);
 
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Logging empty document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+      Logging.ingest.debug("Logging empty document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineConnectionsWithVersions.getSpecification().getBasicPipelineSpecification())+"'");
     }
 
     // Set up a pipeline
@@ -727,7 +720,7 @@ public class IncrementalIngester extends
   * method also REMOVES ALL OLD METADATA.  When complete, the index will contain only the metadata
   * described by the RepositoryDocument object passed to this method.
   * ServiceInterruption is thrown if the document ingestion must be rescheduled.
-  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+  *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
   *@param componentHash is the hashed component identifier, if any.
@@ -742,7 +735,7 @@ public class IncrementalIngester extends
   */
   @Override
   public boolean documentIngest(
-    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
     String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String authorityName,
@@ -751,13 +744,11 @@ public class IncrementalIngester extends
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption, IOException
   {
-    PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
-    
     String docKey = makeKey(identifierClass,identifierHash);
 
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Ingesting document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+      Logging.ingest.debug("Ingesting document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineConnectionsWithVersions.getSpecification().getBasicPipelineSpecification())+"'");
     }
 
     // Set indexing date
@@ -2565,12 +2556,12 @@ public class IncrementalIngester extends
   
   protected class PipelineObject
   {
-    public final PipelineConnections pipelineConnections;
+    public final IPipelineConnections pipelineConnections;
     public final IOutputConnector[] outputConnectors;
     public final ITransformationConnector[] transformationConnectors;
     
     public PipelineObject(
-      PipelineConnections pipelineConnections,
+      IPipelineConnections pipelineConnections,
       ITransformationConnector[] transformationConnectors,
       IOutputConnector[] outputConnectors)
     {
@@ -2696,10 +2687,10 @@ public class IncrementalIngester extends
   
   protected class PipelineObjectWithVersions extends PipelineObject
   {
-    protected final PipelineConnectionsWithVersions pipelineConnectionsWithVersions;
+    protected final IPipelineConnectionsWithVersions pipelineConnectionsWithVersions;
     
     public PipelineObjectWithVersions(
-      PipelineConnectionsWithVersions pipelineConnectionsWithVersions,
+      IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
       ITransformationConnector[] transformationConnectors,
       IOutputConnector[] outputConnectors)
     {
@@ -3638,127 +3629,6 @@ public class IncrementalIngester extends
 
   }
   
-  /** This class caches loaded connections corresponding to a pipeline specification.
-  */
-  protected class PipelineConnections
-  {
-    protected final IPipelineSpecification spec;
-    protected final String[] transformationConnectionNames;
-    protected final ITransformationConnection[] transformationConnections;
-    protected final String[] outputConnectionNames;
-    protected final IOutputConnection[] outputConnections;
-    // We need a way to get from stage index to connection index.
-    // These arrays are looked up by stage index, and return the appropriate connection index.
-    protected final Map<Integer,Integer> transformationConnectionLookupMap = new HashMap<Integer,Integer>();
-    protected final Map<Integer,Integer> outputConnectionLookupMap = new HashMap<Integer,Integer>();
-    
-    public PipelineConnections(IPipelineSpecification spec)
-      throws ManifoldCFException
-    {
-      this.spec = spec;
-      IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
-      // Now, load all the connections we'll ever need, being sure to only load one copy of each.
-      // We first segregate them into unique transformation and output connections.
-      int count = basicSpec.getStageCount();
-      Set<String> transformations = new HashSet<String>();
-      Set<String> outputs = new HashSet<String>();
-      for (int i = 0; i < count; i++)
-      {
-        if (basicSpec.checkStageOutputConnection(i))
-          outputs.add(basicSpec.getStageConnectionName(i));
-        else
-          transformations.add(basicSpec.getStageConnectionName(i));
-      }
-      
-      Map<String,Integer> transformationNameMap = new HashMap<String,Integer>();
-      Map<String,Integer> outputNameMap = new HashMap<String,Integer>();
-      transformationConnectionNames = new String[transformations.size()];
-      outputConnectionNames = new String[outputs.size()];
-      int index = 0;
-      for (String connectionName : transformations)
-      {
-        transformationConnectionNames[index] = connectionName;
-        transformationNameMap.put(connectionName,new Integer(index++));
-      }
-      index = 0;
-      for (String connectionName : outputs)
-      {
-        outputConnectionNames[index] = connectionName;
-        outputNameMap.put(connectionName,new Integer(index++));
-      }
-      // Load!
-      transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
-      outputConnections = connectionManager.loadMultiple(outputConnectionNames);
-      
-      for (int i = 0; i < count; i++)
-      {
-        Integer k;
-        if (basicSpec.checkStageOutputConnection(i))
-        {
-          outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(basicSpec.getStageConnectionName(i)));
-        }
-        else
-        {
-          transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(basicSpec.getStageConnectionName(i)));
-        }
-      }
-    }
-    
-    public IPipelineSpecification getSpecification()
-    {
-      return spec;
-    }
-    
-    public String[] getTransformationConnectionNames()
-    {
-      return transformationConnectionNames;
-    }
-    
-    public ITransformationConnection[] getTransformationConnections()
-    {
-      return transformationConnections;
-    }
-    
-    public String[] getOutputConnectionNames()
-    {
-      return outputConnectionNames;
-    }
-    
-    public IOutputConnection[] getOutputConnections()
-    {
-      return outputConnections;
-    }
-    
-    public Integer getTransformationConnectionIndex(int stage)
-    {
-      return transformationConnectionLookupMap.get(new Integer(stage));
-    }
-    
-    public Integer getOutputConnectionIndex(int stage)
-    {
-      return outputConnectionLookupMap.get(new Integer(stage));
-    }
-    
-  }
-
-  protected class PipelineConnectionsWithVersions extends PipelineConnections
-  {
-    protected final IPipelineSpecificationWithVersions pipelineSpecificationWithVersions;
-    
-    public PipelineConnectionsWithVersions(IPipelineSpecificationWithVersions pipelineSpecificationWithVersions)
-      throws ManifoldCFException
-    {
-      super(pipelineSpecificationWithVersions.getPipelineSpecification());
-      this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
-    }
-    
-    public IPipelineSpecificationWithVersions getSpecificationWithVersions()
-    {
-      return pipelineSpecificationWithVersions;
-    }
-    
-  }
-  
   /** This class passes everything through, and monitors what happens so that the
   * framework can compensate for any transformation connector coding errors.
   */

Modified: manifoldcf/branches/CONNECTORS-1118/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1118/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1644162&r1=1644161&r2=1644162&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1118/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-1118/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java Tue Dec  9 19:38:30 2014
@@ -91,63 +91,63 @@ public interface IIncrementalIngester
     throws ManifoldCFException, ServiceInterruption;
 
   /** Check if a document date is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the IPipelineConnections object for this pipeline.
   *@param date is the date to check
   *@param activity are the activities available to this method.
   *@return true if the document with that date is indexable.
   */
   public boolean checkDateIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     Date date,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption;
 
   /** Check if a mime type is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param mimeType is the mime type to check.
   *@param activity are the activities available to this method.
   *@return true if the mimeType is indexable.
   */
   public boolean checkMimeTypeIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     String mimeType,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption;
 
   /** Check if a file is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param localFile is the local file to check.
   *@param activity are the activities available to this method.
   *@return true if the local file is indexable.
   */
   public boolean checkDocumentIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     File localFile,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption;
 
   /** Pre-determine whether a document's length is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that are too long to be indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param length is the length of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   public boolean checkLengthIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     long length,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption;
 
   /** Pre-determine whether a document's URL is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that not indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param url is the url of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   public boolean checkURLIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     String url,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption;
@@ -186,7 +186,7 @@ public interface IIncrementalIngester
   * This method is conceptually similar to documentIngest(), but does not actually take
   * a document or allow it to be transformed.  If there is a document already
   * indexed, it is removed from the index.
-  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+  *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
   *@param componentHash is the hashed component identifier, if any.
@@ -196,7 +196,7 @@ public interface IIncrementalIngester
   *@param activities is an object providing a set of methods that the implementer can use to perform the operation.
   */
   public void documentNoData(
-    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
     String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String authorityName,
@@ -209,7 +209,7 @@ public interface IIncrementalIngester
   * method also REMOVES ALL OLD METADATA.  When complete, the index will contain only the metadata
   * described by the RepositoryDocument object passed to this method.
   * ServiceInterruption is thrown if the document ingestion must be rescheduled.
-  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+  *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
   *@param componentHash is the hashed component identifier, if any.
@@ -223,7 +223,7 @@ public interface IIncrementalIngester
   *@throws IOException only if data stream throws an IOException.
   */
   public boolean documentIngest(
-    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
     String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String authorityName,

Modified: manifoldcf/branches/CONNECTORS-1118/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-1118/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1644162&r1=1644161&r2=1644162&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-1118/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-1118/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Tue Dec  9 19:38:30 2014
@@ -76,6 +76,8 @@ public class WorkerThread extends Thread
       IJobManager jobManager = JobManagerFactory.make(threadContext);
       IBinManager binManager = BinManagerFactory.make(threadContext);
       IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+      ITransformationConnectionManager transformationConnectionManager = TransformationConnectionManagerFactory.make(threadContext);
+      IOutputConnectionManager outputConnectionManager = OutputConnectionManagerFactory.make(threadContext);
       IReprioritizationTracker rt = ReprioritizationTrackerFactory.make(threadContext);
 
       IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
@@ -355,8 +357,8 @@ public class WorkerThread extends Thread
                     }
                     
                     ProcessActivity activity = new ProcessActivity(job.getID(),processID,
-                      threadContext,rt,jobManager,ingester,
-                      connectionName,pipelineSpecification,
+                      rt,jobManager,ingester,
+                      connectionName,pipelineSpecification,transformationConnectionManager,outputConnectionManager,
                       previousDocuments,
                       currentTime,
                       job.getExpiration(),
@@ -1050,6 +1052,181 @@ public class WorkerThread extends Thread
 
   // Nested classes
 
+  /** Pipeline connections implementation.
+  */
+  protected static class PipelineConnections implements IPipelineConnections
+  {
+    protected final IPipelineSpecification spec;
+    protected final String[] transformationConnectionNames;
+    protected final ITransformationConnection[] transformationConnections;
+    protected final String[] outputConnectionNames;
+    protected final IOutputConnection[] outputConnections;
+    // We need a way to get from stage index to connection index.
+    // These arrays are looked up by stage index, and return the appropriate connection index.
+    protected final Map<Integer,Integer> transformationConnectionLookupMap = new HashMap<Integer,Integer>();
+    protected final Map<Integer,Integer> outputConnectionLookupMap = new HashMap<Integer,Integer>();
+    
+    public PipelineConnections(ITransformationConnectionManager transformationConnectionManager,
+      IOutputConnectionManager outputConnectionManager, IPipelineSpecification spec)
+      throws ManifoldCFException
+    {
+      this.spec = spec;
+      IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
+      // Now, load all the connections we'll ever need, being sure to only load one copy of each.
+      // We first segregate them into unique transformation and output connections.
+      int count = basicSpec.getStageCount();
+      Set<String> transformations = new HashSet<String>();
+      Set<String> outputs = new HashSet<String>();
+      for (int i = 0; i < count; i++)
+      {
+        if (basicSpec.checkStageOutputConnection(i))
+          outputs.add(basicSpec.getStageConnectionName(i));
+        else
+          transformations.add(basicSpec.getStageConnectionName(i));
+      }
+      
+      Map<String,Integer> transformationNameMap = new HashMap<String,Integer>();
+      Map<String,Integer> outputNameMap = new HashMap<String,Integer>();
+      transformationConnectionNames = new String[transformations.size()];
+      outputConnectionNames = new String[outputs.size()];
+      int index = 0;
+      for (String connectionName : transformations)
+      {
+        transformationConnectionNames[index] = connectionName;
+        transformationNameMap.put(connectionName,new Integer(index++));
+      }
+      index = 0;
+      for (String connectionName : outputs)
+      {
+        outputConnectionNames[index] = connectionName;
+        outputNameMap.put(connectionName,new Integer(index++));
+      }
+      // Load!
+      transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
+      outputConnections = outputConnectionManager.loadMultiple(outputConnectionNames);
+      
+      for (int i = 0; i < count; i++)
+      {
+        Integer k;
+        if (basicSpec.checkStageOutputConnection(i))
+        {
+          outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(basicSpec.getStageConnectionName(i)));
+        }
+        else
+        {
+          transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(basicSpec.getStageConnectionName(i)));
+        }
+      }
+    }
+    
+    @Override
+    public IPipelineSpecification getSpecification()
+    {
+      return spec;
+    }
+    
+    @Override
+    public String[] getTransformationConnectionNames()
+    {
+      return transformationConnectionNames;
+    }
+    
+    @Override
+    public ITransformationConnection[] getTransformationConnections()
+    {
+      return transformationConnections;
+    }
+    
+    @Override
+    public String[] getOutputConnectionNames()
+    {
+      return outputConnectionNames;
+    }
+    
+    @Override
+    public IOutputConnection[] getOutputConnections()
+    {
+      return outputConnections;
+    }
+    
+    @Override
+    public Integer getTransformationConnectionIndex(int stage)
+    {
+      return transformationConnectionLookupMap.get(new Integer(stage));
+    }
+    
+    @Override
+    public Integer getOutputConnectionIndex(int stage)
+    {
+      return outputConnectionLookupMap.get(new Integer(stage));
+    }
+    
+  }
+
+  /** IPipelineConnectionsWithVersions implementation.
+  */
+  protected static class PipelineConnectionsWithVersions implements IPipelineConnectionsWithVersions
+  {
+    protected final IPipelineConnections pipelineConnections;
+    protected final IPipelineSpecificationWithVersions pipelineSpecificationWithVersions;
+    
+    public PipelineConnectionsWithVersions(IPipelineConnections pipelineConnections, IPipelineSpecificationWithVersions pipelineSpecificationWithVersions)
+      throws ManifoldCFException
+    {
+      this.pipelineConnections = pipelineConnections;
+      this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
+    }
+    
+    @Override
+    public IPipelineSpecification getSpecification()
+    {
+      return pipelineConnections.getSpecification();
+    }
+    
+    @Override
+    public String[] getTransformationConnectionNames()
+    {
+      return pipelineConnections.getTransformationConnectionNames();
+    }
+    
+    @Override
+    public ITransformationConnection[] getTransformationConnections()
+    {
+      return pipelineConnections.getTransformationConnections();
+    }
+    
+    @Override
+    public String[] getOutputConnectionNames()
+    {
+      return pipelineConnections.getOutputConnectionNames();
+    }
+    
+    @Override
+    public IOutputConnection[] getOutputConnections()
+    {
+      return pipelineConnections.getOutputConnections();
+    }
+    
+    @Override
+    public Integer getTransformationConnectionIndex(int stage)
+    {
+      return pipelineConnections.getTransformationConnectionIndex(stage);
+    }
+    
+    @Override
+    public Integer getOutputConnectionIndex(int stage)
+    {
+      return pipelineConnections.getOutputConnectionIndex(stage);
+    }
+
+    @Override
+    public IPipelineSpecificationWithVersions getSpecificationWithVersions()
+    {
+      return pipelineSpecificationWithVersions;
+    }
+    
+  }
+
   /** Process activity class wraps access to the ingester and job queue.
   */
   protected static class ProcessActivity implements IProcessActivity
@@ -1057,11 +1234,12 @@ public class WorkerThread extends Thread
     // Member variables
     protected final Long jobID;
     protected final String processID;
-    protected final IThreadContext threadContext;
     protected final IJobManager jobManager;
     protected final IIncrementalIngester ingester;
     protected final String connectionName;
     protected final IPipelineSpecification pipelineSpecification;
+    protected final ITransformationConnectionManager transformationConnectionManager;
+    protected final IOutputConnectionManager outputConnectionManager;
     protected final Map<String,QueuedDocument> previousDocuments;
     protected final long currentTime;
     protected final Long expireInterval;
@@ -1074,6 +1252,9 @@ public class WorkerThread extends Thread
     protected final String[] legalLinkTypes;
     protected final OutputActivity ingestLogger;
     protected final IReprioritizationTracker rt;
+
+    protected IPipelineConnections pipelineConnections = null;
+    protected IPipelineConnectionsWithVersions pipelineConnectionsWithVersions = null;
     
     // We submit references in bulk, because that's way more efficient.
     protected final Map<DocumentReference,DocumentReference> referenceList = new HashMap<DocumentReference,DocumentReference>();
@@ -1104,16 +1285,23 @@ public class WorkerThread extends Thread
     // This represents primary documents.
     protected final Set<String> touchedPrimarySet = new HashSet<String>();
     
+    protected IPipelineConnections getPipelineConnections()
+      throws ManifoldCFException
+    {
+      if (pipelineConnections == null)
+        pipelineConnections = new PipelineConnections(transformationConnectionManager,outputConnectionManager,pipelineSpecification);
+      return pipelineConnections;
+    }
+    
     /** Constructor.
     *@param jobManager is the job manager
     *@param ingester is the ingester
     */
     public ProcessActivity(Long jobID, String processID,
-      IThreadContext threadContext,
       IReprioritizationTracker rt, IJobManager jobManager,
       IIncrementalIngester ingester,
       String connectionName,
-      IPipelineSpecification pipelineSpecification,
+      IPipelineSpecification pipelineSpecification, ITransformationConnectionManager transformationConnectionManager, IOutputConnectionManager outputConnectionManager,
       Map<String,QueuedDocument> previousDocuments,
       long currentTime,
       Long expireInterval,
@@ -1125,12 +1313,13 @@ public class WorkerThread extends Thread
     {
       this.jobID = jobID;
       this.processID = processID;
-      this.threadContext = threadContext;
       this.rt = rt;
       this.jobManager = jobManager;
       this.ingester = ingester;
       this.connectionName = connectionName;
       this.pipelineSpecification = pipelineSpecification;
+      this.transformationConnectionManager = transformationConnectionManager;
+      this.outputConnectionManager = outputConnectionManager;
       this.previousDocuments = previousDocuments;
       this.currentTime = currentTime;
       this.expireInterval = expireInterval;
@@ -1501,7 +1690,7 @@ public class WorkerThread extends Thread
       // indicates that it should always be refetched.  But I have no way to describe this situation
       // in the database at the moment.
       ingester.documentIngest(
-        computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
+        new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,
         connection.getACLAuthority(),
@@ -1544,7 +1733,7 @@ public class WorkerThread extends Thread
       checkMultipleDispositions(documentIdentifier,componentIdentifier,componentIdentifierHash);
 
       ingester.documentNoData(
-        computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
+        new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,
         connection.getACLAuthority(),
@@ -1934,7 +2123,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkDateIndexable(
-        pipelineSpecification,date,
+        getPipelineConnections(),date,
         ingestLogger);
     }
 
@@ -1947,7 +2136,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkMimeTypeIndexable(
-        pipelineSpecification,mimeType,
+        getPipelineConnections(),mimeType,
         ingestLogger);
     }
 
@@ -1960,7 +2149,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkDocumentIndexable(
-        pipelineSpecification,localFile,
+        getPipelineConnections(),localFile,
         ingestLogger);
     }
 
@@ -1973,7 +2162,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkLengthIndexable(
-        pipelineSpecification,length,
+        getPipelineConnections(),length,
         ingestLogger);
     }
 
@@ -1987,7 +2176,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkURLIndexable(
-        pipelineSpecification,url,
+        getPipelineConnections(),url,
         ingestLogger);
     }