You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/12/09 22:51:46 UTC

svn commit: r1644200 - in /manifoldcf/branches/dev_1x: ./ framework/ framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ framework/pull-agent/src/main/ja...

Author: kwright
Date: Tue Dec  9 21:51:46 2014
New Revision: 1644200

URL: http://svn.apache.org/r1644200
Log:
Fix for CONNECTORS-1118. Warning: change to the IIncrementalIngester API.

Added:
    manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnections.java
      - copied unchanged from r1644197, manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnections.java
    manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnectionsWithVersions.java
      - copied unchanged from r1644197, manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnectionsWithVersions.java
Modified:
    manifoldcf/branches/dev_1x/   (props changed)
    manifoldcf/branches/dev_1x/CHANGES.txt
    manifoldcf/branches/dev_1x/framework/   (props changed)
    manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
    manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
    manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Propchange: manifoldcf/branches/dev_1x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec  9 21:51:46 2014
@@ -42,6 +42,7 @@
 /manifoldcf/branches/CONNECTORS-1089:1635610-1635937
 /manifoldcf/branches/CONNECTORS-1100:1637693-1640317
 /manifoldcf/branches/CONNECTORS-1104:1640149-1640198
+/manifoldcf/branches/CONNECTORS-1118:1644108-1644196
 /manifoldcf/branches/CONNECTORS-120:1406712-1407974,1407982-1411043,1411049-1416451
 /manifoldcf/branches/CONNECTORS-120-1:1416450-1417056
 /manifoldcf/branches/CONNECTORS-13:1525862-1527182,1539324-1541634
@@ -116,4 +117,4 @@
 /manifoldcf/branches/CONNECTORS-981:1605049-1605773
 /manifoldcf/branches/CONNECTORS-989:1611600-1612101
 /manifoldcf/branches/CONNECTORS-990:1610284-1610707
-/manifoldcf/trunk
 4202,1634264,1634373,1634530,1634688,1634850,1634857,1635103,1635116,1635421,1635438,1635478,1635481,1635484,1635490,1635809,1635939,1636146,1636167,1636180,1636207,1636215,1636232,1636334,1636519,1636570,1636684,1636940,1637011,1637310,1637350,1637364,1637373,1637378,1639259,1639593,1639600,1640018,1640101,1640199,1640204,1640208,1640314,1640319,1640749,1640772,1640805,1640888,1640925,1640941-1640942,1641222,1641328,1641557,1641559,1641629,1641633,1641724,1641754,1641911,1642163,1642255,1642318,1642531,1642650,1642658,1642673,1642716
+/manifoldcf/trunk:1620703,1620748,1620812,1620862,1621449,1621613,1621855,1622213,1622740,1622850,1622853-1622854,1623249,1623251,1623314,1623599,1623951,1623953-1623954,1623956,1623972,1624058,1624085,1624174,1624236,1624377,1624384,1624399,1624449,1624464,1624504,1624729-1624731,1624906,1624909-1624910,1625023,1625095,1625103,1625108,1625264,1625270,1625394,1625400,1625910,1626090,1626097,1626102,1626638-1626639,1626973,1627687,1627690,1627959,1628066,1628106,1628168,1628188,1628699,1628798,1628808,1628845,1628905,1629122,1629374-1629375,1629379,1629541,1629994,1630188,1630535,1630623,1630671,1630812,1630885,1631039,1631162,1631164,1631252,1631750,1631953,1632013,1632225,1632289,1632562,1632844,1632847,1632854,1633062-1633063,1633108,1633193,1633202,1633282,1633284,1633295,1633336,1633339,1633345,1633348,1633364,1633378,1633383,1633432,1633546,1633590,1633634,1633668,1633727,1633760,1633764,1633786,1633910,1633923,1634021,1634028,1634067,1634132,1634145,1634148,1634155,1634188,163
 4202,1634264,1634373,1634530,1634688,1634850,1634857,1635103,1635116,1635421,1635438,1635478,1635481,1635484,1635490,1635809,1635939,1636146,1636167,1636180,1636207,1636215,1636232,1636334,1636519,1636570,1636684,1636940,1637011,1637310,1637350,1637364,1637373,1637378,1639259,1639593,1639600,1640018,1640101,1640199,1640204,1640208,1640314,1640319,1640749,1640772,1640805,1640888,1640925,1640941-1640942,1641222,1641328,1641557,1641559,1641629,1641633,1641724,1641754,1641911,1642163,1642255,1642318,1642531,1642650,1642658,1642673,1642716,1644197

Modified: manifoldcf/branches/dev_1x/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/CHANGES.txt?rev=1644200&r1=1644199&r2=1644200&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/CHANGES.txt (original)
+++ manifoldcf/branches/dev_1x/CHANGES.txt Tue Dec  9 21:51:46 2014
@@ -3,6 +3,10 @@ $Id$
 
 ======================= 1.8-dev =====================
 
+CONNECTORS-1118: Change IIncrementalIngester interface to allow
+for cached connection instances.
+(Aeham Abushwashi, Karl Wright)
+
 CONNECTORS-974: Make SharePoint 2010 be the default selection.
 (Karl Wright)
 

Propchange: manifoldcf/branches/dev_1x/framework/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec  9 21:51:46 2014
@@ -112,4 +112,4 @@
 /manifoldcf/branches/CONNECTORS-989/framework:1611600-1612101
 /manifoldcf/branches/CONNECTORS-990/framework:1610284-1610707
 /manifoldcf/trunk:1629122
-/manifoldcf/trunk/framework
 642163,1642255,1642318
+/manifoldcf/trunk/framework
 642163,1642255,1642318,1644197

Modified: manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1644200&r1=1644199&r2=1644200&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Tue Dec  9 21:51:46 2014
@@ -256,20 +256,19 @@ public class IncrementalIngester extends
   }
 
   /** Check if a date is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the IPipelineConnections object for this pipeline.
   *@param date is the date to check.
   *@param activity are the activities available to this method.
   *@return true if the mimeType is indexable.
   */
   @Override
   public boolean checkDateIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     Date date,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineObject pipeline = pipelineGrab(
-      new PipelineConnections(pipelineSpecification));
+    PipelineObject pipeline = pipelineGrab(pipelineConnections);
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -284,20 +283,19 @@ public class IncrementalIngester extends
   }
 
   /** Check if a mime type is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param mimeType is the mime type to check.
   *@param activity are the activities available to this method.
   *@return true if the mimeType is indexable.
   */
   @Override
   public boolean checkMimeTypeIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     String mimeType,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineObject pipeline = pipelineGrab(
-      new PipelineConnections(pipelineSpecification));
+    PipelineObject pipeline = pipelineGrab(pipelineConnections);
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -312,20 +310,19 @@ public class IncrementalIngester extends
   }
 
   /** Check if a file is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param localFile is the local file to check.
   *@param activity are the activities available to this method.
   *@return true if the local file is indexable.
   */
   @Override
   public boolean checkDocumentIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     File localFile,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineObject pipeline = pipelineGrab(
-      new PipelineConnections(pipelineSpecification));
+    PipelineObject pipeline = pipelineGrab(pipelineConnections);
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -341,20 +338,19 @@ public class IncrementalIngester extends
 
   /** Pre-determine whether a document's length is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that are too long to be indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param length is the length of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   @Override
   public boolean checkLengthIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     long length,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineObject pipeline = pipelineGrab(
-      new PipelineConnections(pipelineSpecification));
+    PipelineObject pipeline = pipelineGrab(pipelineConnections);
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -370,20 +366,19 @@ public class IncrementalIngester extends
 
   /** Pre-determine whether a document's URL is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that not indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param url is the url of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   @Override
   public boolean checkURLIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     String url,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineObject pipeline = pipelineGrab(
-      new PipelineConnections(pipelineSpecification));
+    PipelineObject pipeline = pipelineGrab(pipelineConnections);
     if (pipeline == null)
       // A connector is not installed; treat this as a service interruption.
       throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -404,7 +399,7 @@ public class IncrementalIngester extends
   *@param outputDescriptionString - the output description string
   *@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
   */
-  protected PipelineObjectWithVersions pipelineGrabWithVersions(PipelineConnectionsWithVersions pipelineConnections)
+  protected PipelineObjectWithVersions pipelineGrabWithVersions(IPipelineConnectionsWithVersions pipelineConnections)
     throws ManifoldCFException
   {
     // Pick up all needed transformation connectors
@@ -454,7 +449,7 @@ public class IncrementalIngester extends
   *@param outputDescriptionString - the output description string
   *@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
   */
-  protected PipelineObject pipelineGrab(PipelineConnections pipelineConnections)
+  protected PipelineObject pipelineGrab(IPipelineConnections pipelineConnections)
     throws ManifoldCFException
   {
     // Pick up all needed transformation connectors
@@ -711,7 +706,7 @@ public class IncrementalIngester extends
   * This method is conceptually similar to documentIngest(), but does not actually take
   * a document or allow it to be transformed.  If there is a document already
   * indexed, it is removed from the index.
-  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+  *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
   *@param componentHash is the hashed component identifier, if any.
@@ -723,7 +718,7 @@ public class IncrementalIngester extends
   */
   @Override
   public void documentNoData(
-    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
     String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String parameterVersion,
@@ -732,13 +727,11 @@ public class IncrementalIngester extends
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-    PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
-    
     String docKey = makeKey(identifierClass,identifierHash);
 
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Logging empty document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+      Logging.ingest.debug("Logging empty document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineConnectionsWithVersions.getSpecification().getBasicPipelineSpecification())+"'");
     }
 
     // Set up a pipeline
@@ -761,7 +754,7 @@ public class IncrementalIngester extends
   * method also REMOVES ALL OLD METADATA.  When complete, the index will contain only the metadata
   * described by the RepositoryDocument object passed to this method.
   * ServiceInterruption is thrown if the document ingestion must be rescheduled.
-  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+  *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
   *@param componentHash is the hashed component identifier, if any.
@@ -777,7 +770,7 @@ public class IncrementalIngester extends
   */
   @Override
   public boolean documentIngest(
-    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
     String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String parameterVersion,
@@ -787,13 +780,11 @@ public class IncrementalIngester extends
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption, IOException
   {
-    PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
-    
     String docKey = makeKey(identifierClass,identifierHash);
 
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Ingesting document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+      Logging.ingest.debug("Ingesting document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineConnectionsWithVersions.getSpecification().getBasicPipelineSpecification())+"'");
     }
 
     // Set indexing date
@@ -2607,12 +2598,12 @@ public class IncrementalIngester extends
   
   protected class PipelineObject
   {
-    public final PipelineConnections pipelineConnections;
+    public final IPipelineConnections pipelineConnections;
     public final IOutputConnector[] outputConnectors;
     public final ITransformationConnector[] transformationConnectors;
     
     public PipelineObject(
-      PipelineConnections pipelineConnections,
+      IPipelineConnections pipelineConnections,
       ITransformationConnector[] transformationConnectors,
       IOutputConnector[] outputConnectors)
     {
@@ -2738,10 +2729,10 @@ public class IncrementalIngester extends
   
   protected class PipelineObjectWithVersions extends PipelineObject
   {
-    protected final PipelineConnectionsWithVersions pipelineConnectionsWithVersions;
+    protected final IPipelineConnectionsWithVersions pipelineConnectionsWithVersions;
     
     public PipelineObjectWithVersions(
-      PipelineConnectionsWithVersions pipelineConnectionsWithVersions,
+      IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
       ITransformationConnector[] transformationConnectors,
       IOutputConnector[] outputConnectors)
     {
@@ -3700,127 +3691,6 @@ public class IncrementalIngester extends
 
   }
   
-  /** This class caches loaded connections corresponding to a pipeline specification.
-  */
-  protected class PipelineConnections
-  {
-    protected final IPipelineSpecification spec;
-    protected final String[] transformationConnectionNames;
-    protected final ITransformationConnection[] transformationConnections;
-    protected final String[] outputConnectionNames;
-    protected final IOutputConnection[] outputConnections;
-    // We need a way to get from stage index to connection index.
-    // These arrays are looked up by stage index, and return the appropriate connection index.
-    protected final Map<Integer,Integer> transformationConnectionLookupMap = new HashMap<Integer,Integer>();
-    protected final Map<Integer,Integer> outputConnectionLookupMap = new HashMap<Integer,Integer>();
-    
-    public PipelineConnections(IPipelineSpecification spec)
-      throws ManifoldCFException
-    {
-      this.spec = spec;
-      IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
-      // Now, load all the connections we'll ever need, being sure to only load one copy of each.
-      // We first segregate them into unique transformation and output connections.
-      int count = basicSpec.getStageCount();
-      Set<String> transformations = new HashSet<String>();
-      Set<String> outputs = new HashSet<String>();
-      for (int i = 0; i < count; i++)
-      {
-        if (basicSpec.checkStageOutputConnection(i))
-          outputs.add(basicSpec.getStageConnectionName(i));
-        else
-          transformations.add(basicSpec.getStageConnectionName(i));
-      }
-      
-      Map<String,Integer> transformationNameMap = new HashMap<String,Integer>();
-      Map<String,Integer> outputNameMap = new HashMap<String,Integer>();
-      transformationConnectionNames = new String[transformations.size()];
-      outputConnectionNames = new String[outputs.size()];
-      int index = 0;
-      for (String connectionName : transformations)
-      {
-        transformationConnectionNames[index] = connectionName;
-        transformationNameMap.put(connectionName,new Integer(index++));
-      }
-      index = 0;
-      for (String connectionName : outputs)
-      {
-        outputConnectionNames[index] = connectionName;
-        outputNameMap.put(connectionName,new Integer(index++));
-      }
-      // Load!
-      transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
-      outputConnections = connectionManager.loadMultiple(outputConnectionNames);
-      
-      for (int i = 0; i < count; i++)
-      {
-        Integer k;
-        if (basicSpec.checkStageOutputConnection(i))
-        {
-          outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(basicSpec.getStageConnectionName(i)));
-        }
-        else
-        {
-          transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(basicSpec.getStageConnectionName(i)));
-        }
-      }
-    }
-    
-    public IPipelineSpecification getSpecification()
-    {
-      return spec;
-    }
-    
-    public String[] getTransformationConnectionNames()
-    {
-      return transformationConnectionNames;
-    }
-    
-    public ITransformationConnection[] getTransformationConnections()
-    {
-      return transformationConnections;
-    }
-    
-    public String[] getOutputConnectionNames()
-    {
-      return outputConnectionNames;
-    }
-    
-    public IOutputConnection[] getOutputConnections()
-    {
-      return outputConnections;
-    }
-    
-    public Integer getTransformationConnectionIndex(int stage)
-    {
-      return transformationConnectionLookupMap.get(new Integer(stage));
-    }
-    
-    public Integer getOutputConnectionIndex(int stage)
-    {
-      return outputConnectionLookupMap.get(new Integer(stage));
-    }
-    
-  }
-
-  protected class PipelineConnectionsWithVersions extends PipelineConnections
-  {
-    protected final IPipelineSpecificationWithVersions pipelineSpecificationWithVersions;
-    
-    public PipelineConnectionsWithVersions(IPipelineSpecificationWithVersions pipelineSpecificationWithVersions)
-      throws ManifoldCFException
-    {
-      super(pipelineSpecificationWithVersions.getPipelineSpecification());
-      this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
-    }
-    
-    public IPipelineSpecificationWithVersions getSpecificationWithVersions()
-    {
-      return pipelineSpecificationWithVersions;
-    }
-    
-  }
-  
   /** This class passes everything through, and monitors what happens so that the
   * framework can compensate for any transformation connector coding errors.
   */

Modified: manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1644200&r1=1644199&r2=1644200&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java (original)
+++ manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java Tue Dec  9 21:51:46 2014
@@ -91,63 +91,63 @@ public interface IIncrementalIngester
     throws ManifoldCFException, ServiceInterruption;
 
   /** Check if a document date is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the IPipelineConnections object for this pipeline.
   *@param date is the date to check
   *@param activity are the activities available to this method.
   *@return true if the document with that date is indexable.
   */
   public boolean checkDateIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     Date date,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption;
 
   /** Check if a mime type is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param mimeType is the mime type to check.
   *@param activity are the activities available to this method.
   *@return true if the mimeType is indexable.
   */
   public boolean checkMimeTypeIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     String mimeType,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption;
 
   /** Check if a file is indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param localFile is the local file to check.
   *@param activity are the activities available to this method.
   *@return true if the local file is indexable.
   */
   public boolean checkDocumentIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     File localFile,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption;
 
   /** Pre-determine whether a document's length is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that are too long to be indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param length is the length of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   public boolean checkLengthIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     long length,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption;
 
   /** Pre-determine whether a document's URL is indexable by this connector.  This method is used by participating repository connectors
   * to help filter out documents that not indexable.
-  *@param pipelineSpecification is the pipeline specification.
+  *@param pipelineConnections is the pipeline connections object for this pipeline.
   *@param url is the url of the document.
   *@param activity are the activities available to this method.
   *@return true if the file is indexable.
   */
   public boolean checkURLIndexable(
-    IPipelineSpecification pipelineSpecification,
+    IPipelineConnections pipelineConnections,
     String url,
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption;
@@ -188,7 +188,7 @@ public interface IIncrementalIngester
   * This method is conceptually similar to documentIngest(), but does not actually take
   * a document or allow it to be transformed.  If there is a document already
   * indexed, it is removed from the index.
-  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+  *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
   *@param componentHash is the hashed component identifier, if any.
@@ -199,7 +199,7 @@ public interface IIncrementalIngester
   *@param activities is an object providing a set of methods that the implementer can use to perform the operation.
   */
   public void documentNoData(
-    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
     String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String parameterVersion,
@@ -213,7 +213,7 @@ public interface IIncrementalIngester
   * method also REMOVES ALL OLD METADATA.  When complete, the index will contain only the metadata
   * described by the RepositoryDocument object passed to this method.
   * ServiceInterruption is thrown if the document ingestion must be rescheduled.
-  *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+  *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
   *@param componentHash is the hashed component identifier, if any.
@@ -228,7 +228,7 @@ public interface IIncrementalIngester
   *@throws IOException only if data stream throws an IOException.
   */
   public boolean documentIngest(
-    IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+    IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
     String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String parameterVersion,

Modified: manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1644200&r1=1644199&r2=1644200&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Tue Dec  9 21:51:46 2014
@@ -76,6 +76,8 @@ public class WorkerThread extends Thread
       IJobManager jobManager = JobManagerFactory.make(threadContext);
       IBinManager binManager = BinManagerFactory.make(threadContext);
       IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+      ITransformationConnectionManager transformationConnectionManager = TransformationConnectionManagerFactory.make(threadContext);
+      IOutputConnectionManager outputConnectionManager = OutputConnectionManagerFactory.make(threadContext);
       IReprioritizationTracker rt = ReprioritizationTrackerFactory.make(threadContext);
 
       IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
@@ -357,8 +359,8 @@ public class WorkerThread extends Thread
                     }
                     
                     ProcessActivity activity = new ProcessActivity(job.getID(),processID,
-                      threadContext,rt,jobManager,ingester,
-                      connectionName,pipelineSpecification,
+                      rt,jobManager,ingester,
+                      connectionName,pipelineSpecification,transformationConnectionManager,outputConnectionManager,
                       previousDocuments,
                       currentTime,
                       job.getExpiration(),
@@ -1108,6 +1110,181 @@ public class WorkerThread extends Thread
 
   // Nested classes
 
+  /** Pipeline connections implementation.
+  */
+  protected static class PipelineConnections implements IPipelineConnections
+  {
+    protected final IPipelineSpecification spec;
+    protected final String[] transformationConnectionNames;
+    protected final ITransformationConnection[] transformationConnections;
+    protected final String[] outputConnectionNames;
+    protected final IOutputConnection[] outputConnections;
+    // We need a way to get from stage index to connection index.
+    // These arrays are looked up by stage index, and return the appropriate connection index.
+    protected final Map<Integer,Integer> transformationConnectionLookupMap = new HashMap<Integer,Integer>();
+    protected final Map<Integer,Integer> outputConnectionLookupMap = new HashMap<Integer,Integer>();
+    
+    public PipelineConnections(ITransformationConnectionManager transformationConnectionManager,
+      IOutputConnectionManager outputConnectionManager, IPipelineSpecification spec)
+      throws ManifoldCFException
+    {
+      this.spec = spec;
+      IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
+      // Now, load all the connections we'll ever need, being sure to only load one copy of each.
+      // We first segregate them into unique transformation and output connections.
+      int count = basicSpec.getStageCount();
+      Set<String> transformations = new HashSet<String>();
+      Set<String> outputs = new HashSet<String>();
+      for (int i = 0; i < count; i++)
+      {
+        if (basicSpec.checkStageOutputConnection(i))
+          outputs.add(basicSpec.getStageConnectionName(i));
+        else
+          transformations.add(basicSpec.getStageConnectionName(i));
+      }
+      
+      Map<String,Integer> transformationNameMap = new HashMap<String,Integer>();
+      Map<String,Integer> outputNameMap = new HashMap<String,Integer>();
+      transformationConnectionNames = new String[transformations.size()];
+      outputConnectionNames = new String[outputs.size()];
+      int index = 0;
+      for (String connectionName : transformations)
+      {
+        transformationConnectionNames[index] = connectionName;
+        transformationNameMap.put(connectionName,new Integer(index++));
+      }
+      index = 0;
+      for (String connectionName : outputs)
+      {
+        outputConnectionNames[index] = connectionName;
+        outputNameMap.put(connectionName,new Integer(index++));
+      }
+      // Load!
+      transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
+      outputConnections = outputConnectionManager.loadMultiple(outputConnectionNames);
+      
+      for (int i = 0; i < count; i++)
+      {
+        Integer k;
+        if (basicSpec.checkStageOutputConnection(i))
+        {
+          outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(basicSpec.getStageConnectionName(i)));
+        }
+        else
+        {
+          transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(basicSpec.getStageConnectionName(i)));
+        }
+      }
+    }
+    
+    @Override
+    public IPipelineSpecification getSpecification()
+    {
+      return spec;
+    }
+    
+    @Override
+    public String[] getTransformationConnectionNames()
+    {
+      return transformationConnectionNames;
+    }
+    
+    @Override
+    public ITransformationConnection[] getTransformationConnections()
+    {
+      return transformationConnections;
+    }
+    
+    @Override
+    public String[] getOutputConnectionNames()
+    {
+      return outputConnectionNames;
+    }
+    
+    @Override
+    public IOutputConnection[] getOutputConnections()
+    {
+      return outputConnections;
+    }
+    
+    @Override
+    public Integer getTransformationConnectionIndex(int stage)
+    {
+      return transformationConnectionLookupMap.get(new Integer(stage));
+    }
+    
+    @Override
+    public Integer getOutputConnectionIndex(int stage)
+    {
+      return outputConnectionLookupMap.get(new Integer(stage));
+    }
+    
+  }
+
+  /** IPipelineConnectionsWithVersions implementation.
+  */
+  protected static class PipelineConnectionsWithVersions implements IPipelineConnectionsWithVersions
+  {
+    protected final IPipelineConnections pipelineConnections;
+    protected final IPipelineSpecificationWithVersions pipelineSpecificationWithVersions;
+    
+    public PipelineConnectionsWithVersions(IPipelineConnections pipelineConnections, IPipelineSpecificationWithVersions pipelineSpecificationWithVersions)
+      throws ManifoldCFException
+    {
+      this.pipelineConnections = pipelineConnections;
+      this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
+    }
+    
+    @Override
+    public IPipelineSpecification getSpecification()
+    {
+      return pipelineConnections.getSpecification();
+    }
+    
+    @Override
+    public String[] getTransformationConnectionNames()
+    {
+      return pipelineConnections.getTransformationConnectionNames();
+    }
+    
+    @Override
+    public ITransformationConnection[] getTransformationConnections()
+    {
+      return pipelineConnections.getTransformationConnections();
+    }
+    
+    @Override
+    public String[] getOutputConnectionNames()
+    {
+      return pipelineConnections.getOutputConnectionNames();
+    }
+    
+    @Override
+    public IOutputConnection[] getOutputConnections()
+    {
+      return pipelineConnections.getOutputConnections();
+    }
+    
+    @Override
+    public Integer getTransformationConnectionIndex(int stage)
+    {
+      return pipelineConnections.getTransformationConnectionIndex(stage);
+    }
+    
+    @Override
+    public Integer getOutputConnectionIndex(int stage)
+    {
+      return pipelineConnections.getOutputConnectionIndex(stage);
+    }
+
+    @Override
+    public IPipelineSpecificationWithVersions getSpecificationWithVersions()
+    {
+      return pipelineSpecificationWithVersions;
+    }
+    
+  }
+
   /** Process activity class wraps access to the ingester and job queue.
   */
   protected static class ProcessActivity implements IProcessActivity
@@ -1115,11 +1292,12 @@ public class WorkerThread extends Thread
     // Member variables
     protected final Long jobID;
     protected final String processID;
-    protected final IThreadContext threadContext;
     protected final IJobManager jobManager;
     protected final IIncrementalIngester ingester;
     protected final String connectionName;
     protected final IPipelineSpecification pipelineSpecification;
+    protected final ITransformationConnectionManager transformationConnectionManager;
+    protected final IOutputConnectionManager outputConnectionManager;
     protected final Map<String,QueuedDocument> previousDocuments;
     protected final long currentTime;
     protected final Long expireInterval;
@@ -1134,6 +1312,9 @@ public class WorkerThread extends Thread
     protected final OutputActivity ingestLogger;
     protected final IReprioritizationTracker rt;
     protected final String parameterVersion;
+
+    protected IPipelineConnections pipelineConnections = null;
+    protected IPipelineConnectionsWithVersions pipelineConnectionsWithVersions = null;
     
     // We submit references in bulk, because that's way more efficient.
     protected final Map<DocumentReference,DocumentReference> referenceList = new HashMap<DocumentReference,DocumentReference>();
@@ -1164,16 +1345,23 @@ public class WorkerThread extends Thread
     // This represents primary documents.
     protected final Set<String> touchedPrimarySet = new HashSet<String>();
     
+    protected IPipelineConnections getPipelineConnections()
+      throws ManifoldCFException
+    {
+      if (pipelineConnections == null)
+        pipelineConnections = new PipelineConnections(transformationConnectionManager,outputConnectionManager,pipelineSpecification);
+      return pipelineConnections;
+    }
+    
     /** Constructor.
     *@param jobManager is the job manager
     *@param ingester is the ingester
     */
     public ProcessActivity(Long jobID, String processID,
-      IThreadContext threadContext,
       IReprioritizationTracker rt, IJobManager jobManager,
       IIncrementalIngester ingester,
       String connectionName,
-      IPipelineSpecification pipelineSpecification,
+      IPipelineSpecification pipelineSpecification, ITransformationConnectionManager transformationConnectionManager, IOutputConnectionManager outputConnectionManager,
       Map<String,QueuedDocument> previousDocuments,
       long currentTime,
       Long expireInterval,
@@ -1187,12 +1375,13 @@ public class WorkerThread extends Thread
     {
       this.jobID = jobID;
       this.processID = processID;
-      this.threadContext = threadContext;
       this.rt = rt;
       this.jobManager = jobManager;
       this.ingester = ingester;
       this.connectionName = connectionName;
       this.pipelineSpecification = pipelineSpecification;
+      this.transformationConnectionManager = transformationConnectionManager;
+      this.outputConnectionManager = outputConnectionManager;
       this.previousDocuments = previousDocuments;
       this.currentTime = currentTime;
       this.expireInterval = expireInterval;
@@ -1608,7 +1797,7 @@ public class WorkerThread extends Thread
       // indicates that it should always be refetched.  But I have no way to describe this situation
       // in the database at the moment.
       ingester.documentIngest(
-        computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
+        new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,parameterVersion,
         connection.getACLAuthority(),
@@ -1651,7 +1840,7 @@ public class WorkerThread extends Thread
       checkMultipleDispositions(documentIdentifier,componentIdentifier,componentIdentifierHash);
 
       ingester.documentNoData(
-        computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
+        new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
         connectionName,documentIdentifierHash,componentIdentifierHash,
         version,parameterVersion,
         connection.getACLAuthority(),
@@ -1962,6 +2151,7 @@ public class WorkerThread extends Thread
 
         long currentTime = System.currentTimeMillis();
 
+        double currentMinimumDepth = rt.getMinimumDepth();
         rt.clearPreloadRequests();
         for (int j = 0; j < docidHashes.length; j++)
         {
@@ -1974,7 +2164,7 @@ public class WorkerThread extends Thread
 
           // Calculate desired document priority based on current queuetracker status.
           String[] bins = ManifoldCF.calculateBins(connector,dr.getLocalIdentifier());
-          PriorityCalculator p = new PriorityCalculator(rt,connection,bins);
+          PriorityCalculator p = new PriorityCalculator(rt,currentMinimumDepth,connection,bins);
           priorities[j] = p;
           p.makePreloadRequest();
         }
@@ -2054,7 +2244,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkDateIndexable(
-        pipelineSpecification,date,
+        getPipelineConnections(),date,
         ingestLogger);
     }
 
@@ -2067,7 +2257,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkMimeTypeIndexable(
-        pipelineSpecification,mimeType,
+        getPipelineConnections(),mimeType,
         ingestLogger);
     }
 
@@ -2080,7 +2270,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkDocumentIndexable(
-        pipelineSpecification,localFile,
+        getPipelineConnections(),localFile,
         ingestLogger);
     }
 
@@ -2093,7 +2283,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkLengthIndexable(
-        pipelineSpecification,length,
+        getPipelineConnections(),length,
         ingestLogger);
     }
 
@@ -2107,7 +2297,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkURLIndexable(
-        pipelineSpecification,url,
+        getPipelineConnections(),url,
         ingestLogger);
     }