You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/14 20:14:43 UTC

svn commit: r1602613 [1/2] - in /manifoldcf/branches/CONNECTORS-962/framework: agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ pull-agent/src/main/java/org/apache/manifo...

Author: kwright
Date: Sat Jun 14 18:14:42 2014
New Revision: 1602613

URL: http://svn.apache.org/r1602613
Log:
Get the code to build again

Added:
    manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java   (with props)
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java   (with props)
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationBasic.java   (with props)
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java   (with props)
Modified:
    manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
    manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java
    manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
    manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java
    manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentDescription.java
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/QueuedDocument.java
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/QueuedDocumentSet.java
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
    manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Sat Jun 14 18:14:42 2014
@@ -216,6 +216,21 @@ public class IncrementalIngester extends
     performDelete("",null,null);
   }
 
+  /** From a pipeline specification, get the name of the output connection that will be indexed last
+  * in the pipeline.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@return the last indexed output connection name.
+  */
+  @Override
+  public String getLastIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic)
+  {
+    // It's always the last in the sequence.
+    int count = pipelineSpecificationBasic.getOutputCount();
+    if (count == 0)
+      return null;
+    return pipelineSpecificationBasic.getStageConnectionName(count-1);
+  }
+
   /** Check if a mime type is indexable.
   *@param outputConnectionName is the name of the output connection associated with this action.
   *@param outputDescription is the output description string.
@@ -558,13 +573,15 @@ public class IncrementalIngester extends
     String newParameterVersion,
     String newAuthorityNameString)
   {
+    IPipelineSpecification pipelineSpecification = pipelineSpecificationWithVersions.getPipelineSpecification();
+    IPipelineSpecificationBasic basicSpecification = pipelineSpecification.getBasicPipelineSpecification();
     // Empty document version has a special meaning....
     if (newDocumentVersion.length() == 0)
       return true;
     // Otherwise, cycle through the outputs
-    for (int i = 0; i < pipelineSpecificationWithVersions.getOutputCount(); i++)
+    for (int i = 0; i < basicSpecification.getOutputCount(); i++)
     {
-      int stage = pipelineSpecificationWithVersions.getOutputStage(i);
+      int stage = basicSpecification.getOutputStage(i);
       String oldDocumentVersion = pipelineSpecificationWithVersions.getOutputDocumentVersionString(i);
       String oldParameterVersion = pipelineSpecificationWithVersions.getOutputParameterVersionString(i);
       String oldOutputVersion = pipelineSpecificationWithVersions.getOutputVersionString(i);
@@ -576,11 +593,11 @@ public class IncrementalIngester extends
       if (!oldDocumentVersion.equals(newDocumentVersion) ||
         !oldParameterVersion.equals(newParameterVersion) ||
         !oldAuthorityName.equals(newAuthorityNameString) ||
-        !oldOutputVersion.equals(pipelineSpecificationWithVersions.getStageDescriptionString(stage)))
+        !oldOutputVersion.equals(pipelineSpecification.getStageDescriptionString(stage)))
         return true;
       
       // Everything matches so far.  Next step is to compute a transformation path an corresponding version string.
-      String newTransformationVersion = computePackedTransformationVersion(pipelineSpecificationWithVersions,stage);
+      String newTransformationVersion = computePackedTransformationVersion(pipelineSpecification,stage);
       if (!pipelineSpecificationWithVersions.getOutputTransformationVersionString(i).equals(newTransformationVersion))
         return true;
     }
@@ -595,12 +612,13 @@ public class IncrementalIngester extends
   */
   protected static String computePackedTransformationVersion(IPipelineSpecification pipelineSpecification, int stage)
   {
+    IPipelineSpecificationBasic basicSpecification = pipelineSpecification.getBasicPipelineSpecification();
     // First, count the stages we need to represent
     int stageCount = 0;
     int currentStage = stage;
     while (true)
     {
-      int newStage = pipelineSpecification.getStageParent(currentStage);
+      int newStage = basicSpecification.getStageParent(currentStage);
       if (newStage == -1)
         break;
       stageCount++;
@@ -613,10 +631,10 @@ public class IncrementalIngester extends
     currentStage = stage;
     while (true)
     {
-      int newStage = pipelineSpecification.getStageParent(currentStage);
+      int newStage = basicSpecification.getStageParent(currentStage);
       if (newStage == -1)
         break;
-      stageNames[stageCount] = pipelineSpecification.getStageConnectionName(newStage);
+      stageNames[stageCount] = basicSpecification.getStageConnectionName(newStage);
       stageDescriptions[stageCount] = pipelineSpecification.getStageDescriptionString(newStage);
       stageCount++;
       currentStage = newStage;
@@ -1585,22 +1603,20 @@ public class IncrementalIngester extends
     throws ManifoldCFException
   {
     DeleteInfo[] rval = new DeleteInfo[identifierHashes.length];
-    HashMap map = new HashMap();
-    int i = 0;
-    while (i < identifierHashes.length)
+    Map<String,Integer> map = new HashMap<String,Integer>();
+    for (int i = 0; i < identifierHashes.length; i++)
     {
       map.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
       rval[i] = null;
-      i++;
     }
 
     beginTransaction();
     try
     {
-      ArrayList list = new ArrayList();
+      List<String> list = new ArrayList<String>();
       int maxCount = maxClauseDocumentURIChunk(outputConnectionName);
       int j = 0;
-      Iterator iter = map.keySet().iterator();
+      Iterator<String> iter = map.keySet().iterator();
       while (iter.hasNext())
       {
         if (j == maxCount)
@@ -1632,6 +1648,179 @@ public class IncrementalIngester extends
     }
   }
 
+  /** Look up ingestion data for a set of documents.
+  *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+  *@param pipelineSpecificationBasics are the pipeline specifications corresponding to the identifier classes and hashes.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is the array of document identifier hashes to look up.
+  */
+  @Override
+  public void getPipelineDocumentIngestDataMultiple(
+    Map<OutputKey,DocumentIngestStatus> rval,
+    IPipelineSpecificationBasic[] pipelineSpecificationBasics,
+    String[] identifierClasses, String[] identifierHashes)
+    throws ManifoldCFException
+  {
+    // Organize by pipeline spec.
+    Map<IPipelineSpecificationBasic,List<Integer>> keyMap = new HashMap<IPipelineSpecificationBasic,List<Integer>>();
+    for (int i = 0; i < pipelineSpecificationBasics.length; i++)
+    {
+      IPipelineSpecificationBasic spec = pipelineSpecificationBasics[i];
+      List<Integer> list = keyMap.get(spec);
+      if (list == null)
+      {
+        list = new ArrayList<Integer>();
+        keyMap.put(spec,list);
+      }
+      list.add(new Integer(i));
+    }
+
+    // Create the return array.
+    Iterator<IPipelineSpecificationBasic> iter = keyMap.keySet().iterator();
+    while (iter.hasNext())
+    {
+      IPipelineSpecificationBasic spec = iter.next();
+      List<Integer> list = keyMap.get(spec);
+      String[] localIdentifierClasses = new String[list.size()];
+      String[] localIdentifierHashes = new String[list.size()];
+      for (int i = 0; i < localIdentifierClasses.length; i++)
+      {
+        int index = list.get(i).intValue();
+        localIdentifierClasses[i] = identifierClasses[index];
+        localIdentifierHashes[i] = identifierHashes[index];
+      }
+      getPipelineDocumentIngestDataMultiple(rval,spec,localIdentifierClasses,localIdentifierHashes);
+    }
+  }
+
+  /** Look up ingestion data for a SET of documents.
+  *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+  *@param pipelineSpecificationBasic is the pipeline specification for all documents.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is the array of document identifier hashes to look up.
+  */
+  @Override
+  public void getPipelineDocumentIngestDataMultiple(
+    Map<OutputKey,DocumentIngestStatus> rval,
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String[] identifierClasses, String[] identifierHashes)
+    throws ManifoldCFException
+  {
+    String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+
+    // Build a map, so we can convert an identifier into an array index.
+    Map<String,Integer> indexMap = new HashMap<String,Integer>();
+    for (int i = 0; i < identifierHashes.length; i++)
+    {
+      indexMap.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
+    }
+
+    beginTransaction();
+    try
+    {
+      List<String> list = new ArrayList<String>();
+      int maxCount = maxClausePipelineDocumentIngestDataChunk(outputConnectionNames);
+      int j = 0;
+      Iterator<String> iter = indexMap.keySet().iterator();
+      while (iter.hasNext())
+      {
+        if (j == maxCount)
+        {
+          getPipelineDocumentIngestDataChunk(rval,indexMap,outputConnectionNames,list,identifierClasses,identifierHashes);
+          j = 0;
+          list.clear();
+        }
+        list.add(iter.next());
+        j++;
+      }
+      if (j > 0)
+        getPipelineDocumentIngestDataChunk(rval,indexMap,outputConnectionNames,list,identifierClasses,identifierHashes);
+    }
+    catch (ManifoldCFException e)
+    {
+      signalRollback();
+      throw e;
+    }
+    catch (Error e)
+    {
+      signalRollback();
+      throw e;
+    }
+    finally
+    {
+      endTransaction();
+    }
+
+  }
+
+  /** Get a chunk of document ingest data records.
+  *@param rval is the document ingest status array where the data should be put.
+  *@param map is the map from id to index.
+  *@param clause is the in clause for the query.
+  *@param list is the parameter list for the query.
+  */
+  protected void getPipelineDocumentIngestDataChunk(Map<OutputKey,DocumentIngestStatus> rval, Map<String,Integer> map, String[] outputConnectionNames, List<String> list,
+    String[] identifierClasses, String[] identifierHashes)
+    throws ManifoldCFException
+  {
+    ArrayList newList = new ArrayList();
+    String query = buildConjunctionClause(newList,new ClauseDescription[]{
+      new MultiClause(docKeyField,list),
+      new MultiClause(outputConnNameField,outputConnectionNames)});
+      
+    // Get the primary records associated with this hash value
+    IResultSet set = performQuery("SELECT "+idField+","+outputConnNameField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+
+      " FROM "+getTableName()+" WHERE "+query,newList,null,null);
+
+    // Now, go through the original request once more, this time building the result
+    for (int i = 0; i < set.getRowCount(); i++)
+    {
+      IResultRow row = set.getRow(i);
+      String docHash = row.getValue(docKeyField).toString();
+      Integer position = map.get(docHash);
+      if (position != null)
+      {
+        Long id = (Long)row.getValue(idField);
+        String outputConnectionName = (String)row.getValue(outputConnNameField);
+        String lastVersion = (String)row.getValue(lastVersionField);
+        if (lastVersion == null)
+          lastVersion = "";
+        String lastTransformationVersion = (String)row.getValue(lastTransformationVersionField);
+        if (lastTransformationVersion == null)
+          lastTransformationVersion = "";
+        String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
+        if (lastOutputVersion == null)
+          lastOutputVersion = "";
+        String paramVersion = (String)row.getValue(forcedParamsField);
+        if (paramVersion == null)
+          paramVersion = "";
+        String authorityName = (String)row.getValue(authorityNameField);
+        if (authorityName == null)
+          authorityName = "";
+        int indexValue = position.intValue();
+        rval.put(new OutputKey(identifierClasses[indexValue],identifierHashes[indexValue],outputConnectionName),
+          new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName));
+      }
+    }
+  }
+  
+  /** Look up ingestion data for a document.
+  *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+  *@param pipelineSpecificationBasic is the pipeline specification for the document.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hash of the id of the document.
+  */
+  @Override
+  public void getPipelineDocumentIngestData(
+    Map<OutputKey,DocumentIngestStatus> rval,
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash)
+    throws ManifoldCFException
+  {
+    getPipelineDocumentIngestDataMultiple(rval,pipelineSpecificationBasic,
+      new String[]{identifierClass},new String[]{identifierHash});
+  }
+
   /** Look up ingestion data for a SET of documents.
   *@param outputConnectionNames are the names of the output connections associated with this action.
   *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
@@ -1640,50 +1829,45 @@ public class IncrementalIngester extends
   * exist in the index.
   */
   @Override
+  @Deprecated
   public DocumentIngestStatus[] getDocumentIngestDataMultiple(String[] outputConnectionNames,
     String[] identifierClasses, String[] identifierHashes)
     throws ManifoldCFException
   {
     // Segregate request by connection names
-    HashMap keyMap = new HashMap();
-    int i = 0;
-    while (i < outputConnectionNames.length)
+    Map<String,List<Integer>> keyMap = new HashMap<String,List<Integer>>();
+    for (int i = 0; i < outputConnectionNames.length; i++)
     {
       String outputConnectionName = outputConnectionNames[i];
-      ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+      List<Integer> list = keyMap.get(outputConnectionName);
       if (list == null)
       {
-        list = new ArrayList();
+        list = new ArrayList<Integer>();
         keyMap.put(outputConnectionName,list);
       }
       list.add(new Integer(i));
-      i++;
     }
 
     // Create the return array.
     DocumentIngestStatus[] rval = new DocumentIngestStatus[outputConnectionNames.length];
-    Iterator iter = keyMap.keySet().iterator();
+    Iterator<String> iter = keyMap.keySet().iterator();
     while (iter.hasNext())
     {
-      String outputConnectionName = (String)iter.next();
-      ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+      String outputConnectionName = iter.next();
+      List<Integer> list = keyMap.get(outputConnectionName);
       String[] localIdentifierClasses = new String[list.size()];
       String[] localIdentifierHashes = new String[list.size()];
-      i = 0;
-      while (i < localIdentifierClasses.length)
+      for (int i = 0; i < localIdentifierClasses.length; i++)
       {
-        int index = ((Integer)list.get(i)).intValue();
+        int index = list.get(i).intValue();
         localIdentifierClasses[i] = identifierClasses[index];
         localIdentifierHashes[i] = identifierHashes[index];
-        i++;
       }
       DocumentIngestStatus[] localRval = getDocumentIngestDataMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes);
-      i = 0;
-      while (i < localRval.length)
+      for (int i = 0; i < localRval.length; i++)
       {
-        int index = ((Integer)list.get(i)).intValue();
+        int index = list.get(i).intValue();
         rval[index] = localRval[i];
-        i++;
       }
     }
     return rval;
@@ -1697,6 +1881,7 @@ public class IncrementalIngester extends
   * exist in the index.
   */
   @Override
+  @Deprecated
   public DocumentIngestStatus[] getDocumentIngestDataMultiple(String outputConnectionName,
     String[] identifierClasses, String[] identifierHashes)
     throws ManifoldCFException
@@ -1705,22 +1890,20 @@ public class IncrementalIngester extends
     DocumentIngestStatus[] rval = new DocumentIngestStatus[identifierHashes.length];
 
     // Build a map, so we can convert an identifier into an array index.
-    HashMap indexMap = new HashMap();
-    int i = 0;
-    while (i < identifierHashes.length)
+    Map<String,Integer> indexMap = new HashMap<String,Integer>();
+    for (int i = 0; i < identifierHashes.length; i++)
     {
       indexMap.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
       rval[i] = null;
-      i++;
     }
 
     beginTransaction();
     try
     {
-      ArrayList list = new ArrayList();
+      List<String> list = new ArrayList<String>();
       int maxCount = maxClauseDocumentIngestDataChunk(outputConnectionName);
       int j = 0;
-      Iterator iter = indexMap.keySet().iterator();
+      Iterator<String> iter = indexMap.keySet().iterator();
       while (iter.hasNext())
       {
         if (j == maxCount)
@@ -1759,6 +1942,7 @@ public class IncrementalIngester extends
   *@return the current document's ingestion data, or null if the document is not currently ingested.
   */
   @Override
+  @Deprecated
   public DocumentIngestStatus getDocumentIngestData(String outputConnectionName,
     String identifierClass, String identifierHash)
     throws ManifoldCFException
@@ -1768,60 +1952,46 @@ public class IncrementalIngester extends
 
   /** Calculate the average time interval between changes for a document.
   * This is based on the data gathered for the document.
-  *@param outputConnectionName is the name of the output connection associated with this action.
-  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
-  *@param identifierHash is the hash of the id of the document.
-  *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
-  */
-  @Override
-  public long getDocumentUpdateInterval(String outputConnectionName,
-    String identifierClass, String identifierHash)
-    throws ManifoldCFException
-  {
-    return getDocumentUpdateIntervalMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
-  }
-
-  /** Calculate the average time interval between changes for a document.
-  * This is based on the data gathered for the document.
-  *@param outputConnectionName is the name of the output connection associated with this action.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
   *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
   *@param identifierHashes is the hashes of the ids of the documents.
   *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
   */
-  @Override
-  public long[] getDocumentUpdateIntervalMultiple(String outputConnectionName,
+  public long[] getDocumentUpdateIntervalMultiple(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
     String[] identifierClasses, String[] identifierHashes)
     throws ManifoldCFException
   {
+    // Get the output connection names
+    String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+
     // Do these all at once!!
     // First, create a return array
     long[] rval = new long[identifierHashes.length];
     // Also create a map from identifier to return index.
-    HashMap returnMap = new HashMap();
+    Map<String,Integer> returnMap = new HashMap<String,Integer>();
     // Finally, need the set of hash codes
-    HashMap idCodes = new HashMap();
-    int j = 0;
-    while (j < identifierHashes.length)
+    Set<String> idCodes = new HashSet<String>();
+    for (int j = 0; j < identifierHashes.length; j++)
     {
       String key = makeKey(identifierClasses[j],identifierHashes[j]);
-      rval[j] = 0L;
+      rval[j] = Long.MAX_VALUE;
       returnMap.put(key,new Integer(j));
-      idCodes.put(key,key);
-      j++;
+      idCodes.add(key);
     }
 
     // Get the chunk size
-    int maxClause = maxClauseGetIntervals(outputConnectionName);
+    int maxClause = maxClauseGetIntervals(outputConnectionNames);
 
     // Loop through the hash codes
-    Iterator iter = idCodes.keySet().iterator();
-    ArrayList list = new ArrayList();
-    j = 0;
+    Iterator<String> iter = idCodes.iterator();
+    List<String> list = new ArrayList<String>();
+    int j = 0;
     while (iter.hasNext())
     {
       if (j == maxClause)
       {
-        getIntervals(rval,outputConnectionName,list,returnMap);
+        getIntervals(rval,outputConnectionNames,list,returnMap);
         list.clear();
         j = 0;
       }
@@ -1831,40 +2001,97 @@ public class IncrementalIngester extends
     }
 
     if (j > 0)
-      getIntervals(rval,outputConnectionName,list,returnMap);
+      getIntervals(rval,outputConnectionNames,list,returnMap);
 
+    for (int i = 0; i < rval.length; i++)
+    {
+      if (rval[i] == Long.MAX_VALUE)
+        rval[i] = 0;
+    }
+    
     return rval;
+
   }
 
+  /** Calculate the average time interval between changes for a document.
+  * This is based on the data gathered for the document.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hash of the id of the document.
+  *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+  */
+  @Override
+  public long getDocumentUpdateInterval(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash)
+    throws ManifoldCFException
+  {
+    return getDocumentUpdateIntervalMultiple(
+      pipelineSpecificationBasic,
+      new String[]{identifierClass},new String[]{identifierHash})[0];
+  }
+
+  /** Calculate the average time interval between changes for a document.
+  * This is based on the data gathered for the document.
+  *@param outputConnectionName is the name of the output connection associated with this action.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hash of the id of the document.
+  *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+  */
+  @Override
+  @Deprecated
+  public long getDocumentUpdateInterval(String outputConnectionName,
+    String identifierClass, String identifierHash)
+    throws ManifoldCFException
+  {
+    return getDocumentUpdateIntervalMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
+  }
+
+  /** Calculate the average time interval between changes for a document.
+  * This is based on the data gathered for the document.
+  *@param outputConnectionName is the name of the output connection associated with this action.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is the hashes of the ids of the documents.
+  *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+  */
+  @Override
+  @Deprecated
+  public long[] getDocumentUpdateIntervalMultiple(String outputConnectionName,
+    String[] identifierClasses, String[] identifierHashes)
+    throws ManifoldCFException
+  {
+    return getDocumentUpdateIntervalMultiple(new RuntPipelineSpecificationBasic(outputConnectionName),
+      identifierClasses,identifierHashes);
+  }
+  
   /** Calculate the number of clauses.
   */
-  protected int maxClauseGetIntervals(String outputConnectionName)
+  protected int maxClauseGetIntervals(String[] outputConnectionNames)
   {
     return findConjunctionClauseMax(new ClauseDescription[]{
-      new UnitaryClause(outputConnNameField,outputConnectionName)});
-    }
-    
+      new MultiClause(outputConnNameField,outputConnectionNames)});
+  }
+  
   /** Query for and calculate the interval for a bunch of hashcodes.
   *@param rval is the array to stuff calculated return values into.
   *@param list is the list of parameters.
   *@param queryPart is the part of the query pertaining to the list of hashcodes
   *@param returnMap is a mapping from document id to rval index.
   */
-  protected void getIntervals(long[] rval, String outputConnectionName, ArrayList list, HashMap returnMap)
+  protected void getIntervals(long[] rval, String[] outputConnectionNames, List<String> list, Map<String,Integer> returnMap)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
     String query = buildConjunctionClause(newList,new ClauseDescription[]{
       new MultiClause(docKeyField,list),
-      new UnitaryClause(outputConnNameField,outputConnectionName)});
+      new MultiClause(outputConnNameField,outputConnectionNames)});
       
     IResultSet set = performQuery("SELECT "+docKeyField+","+changeCountField+","+firstIngestField+","+lastIngestField+
       " FROM "+getTableName()+" WHERE "+query,newList,null,null);
 
-    int i = 0;
-    while (i < set.getRowCount())
+    for (int i = 0; i < set.getRowCount(); i++)
     {
-      IResultRow row = set.getRow(i++);
+      IResultRow row = set.getRow(i);
       String docHash = (String)row.getValue(docKeyField);
       Integer index = (Integer)returnMap.get(docHash);
       if (index != null)
@@ -1873,7 +2100,10 @@ public class IncrementalIngester extends
         long changeCount = ((Long)row.getValue(changeCountField)).longValue();
         long firstIngest = ((Long)row.getValue(firstIngestField)).longValue();
         long lastIngest = ((Long)row.getValue(lastIngestField)).longValue();
-        rval[index.intValue()] = (long)(((double)(lastIngest-firstIngest))/(double)changeCount);
+        int indexValue = index.intValue();
+        long newValue = (long)(((double)(lastIngest-firstIngest))/(double)changeCount);
+        if (newValue < rval[indexValue])
+          rval[indexValue] = newValue;
       }
     }
   }
@@ -2115,7 +2345,7 @@ public class IncrementalIngester extends
   *@param clause is the in clause for the query.
   *@param list is the parameter list for the query.
   */
-  protected void getDocumentURIChunk(DeleteInfo[] rval, Map map, String outputConnectionName, ArrayList list)
+  protected void getDocumentURIChunk(DeleteInfo[] rval, Map<String,Integer> map, String outputConnectionName, List<String> list)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
@@ -2127,10 +2357,9 @@ public class IncrementalIngester extends
       query,newList,null,null);
 
     // Go through list and put into buckets.
-    int i = 0;
-    while (i < set.getRowCount())
+    for (int i = 0; i < set.getRowCount(); i++)
     {
-      IResultRow row = set.getRow(i++);
+      IResultRow row = set.getRow(i);
       String docHash = row.getValue(docKeyField).toString();
       Integer position = (Integer)map.get(docHash);
       if (position != null)
@@ -2151,6 +2380,14 @@ public class IncrementalIngester extends
     return findConjunctionClauseMax(new ClauseDescription[]{
       new UnitaryClause(outputConnNameField,outputConnectionName)});
   }
+
+  /** Count the clauses
+  */
+  protected int maxClausePipelineDocumentIngestDataChunk(String[] outputConnectionNames)
+  {
+    return findConjunctionClauseMax(new ClauseDescription[]{
+      new MultiClause(outputConnNameField,outputConnectionNames)});
+  }
   
   /** Get a chunk of document ingest data records.
   *@param rval is the document ingest status array where the data should be put.
@@ -2158,7 +2395,7 @@ public class IncrementalIngester extends
   *@param clause is the in clause for the query.
   *@param list is the parameter list for the query.
   */
-  protected void getDocumentIngestDataChunk(DocumentIngestStatus[] rval, Map map, String outputConnectionName, ArrayList list)
+  protected void getDocumentIngestDataChunk(DocumentIngestStatus[] rval, Map<String,Integer> map, String outputConnectionName, List<String> list)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
@@ -2171,12 +2408,11 @@ public class IncrementalIngester extends
       " FROM "+getTableName()+" WHERE "+query,newList,null,null);
 
     // Now, go through the original request once more, this time building the result
-    int i = 0;
-    while (i < set.getRowCount())
+    for (int i = 0; i < set.getRowCount(); i++)
     {
-      IResultRow row = set.getRow(i++);
+      IResultRow row = set.getRow(i);
       String docHash = row.getValue(docKeyField).toString();
-      Integer position = (Integer)map.get(docHash);
+      Integer position = map.get(docHash);
       if (position != null)
       {
         Long id = (Long)row.getValue(idField);
@@ -2195,7 +2431,9 @@ public class IncrementalIngester extends
         String authorityName = (String)row.getValue(authorityNameField);
         if (authorityName == null)
           authorityName = "";
-        rval[position.intValue()] = new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName);
+        int indexValue = position.intValue();
+        rval[indexValue] = new DocumentIngestStatus(
+          lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName);
       }
     }
   }
@@ -2502,10 +2740,11 @@ public class IncrementalIngester extends
       Map<Integer,PipelineCheckEntryPoint> currentSet = new HashMap<Integer,PipelineCheckEntryPoint>();
       // First, locate all the output stages, and enter them into the set
       IPipelineSpecification spec = pipelineConnections.getSpecification();
-      int count = spec.getOutputCount();
+      IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
+      int count = basicSpec.getOutputCount();
       for (int i = 0; i < count; i++)
       {
-        int outputStage = spec.getOutputStage(i);
+        int outputStage = basicSpec.getOutputStage(i);
         PipelineCheckEntryPoint outputStageEntryPoint = new PipelineCheckEntryPoint(
           outputConnectors[pipelineConnections.getOutputConnectionIndex(outputStage).intValue()],
           spec.getStageDescriptionString(outputStage),finalActivity);
@@ -2518,9 +2757,9 @@ public class IncrementalIngester extends
         int[] siblings = null;
         for (Integer outputStage : currentSet.keySet())
         {
-          parent = spec.getStageParent(outputStage.intValue());
+          parent = basicSpec.getStageParent(outputStage.intValue());
           // Look up the children
-          siblings = spec.getStageChildren(parent);
+          siblings = basicSpec.getStageChildren(parent);
           // Are all the siblings in the current set yet?  If not, we can't proceed with this entry.
           boolean skipToNext = false;
           for (int sibling : siblings)
@@ -2595,18 +2834,21 @@ public class IncrementalIngester extends
       // Create the current set
       Map<Integer,PipelineAddEntryPoint> currentSet = new HashMap<Integer,PipelineAddEntryPoint>();
       // First, locate all the output stages, and enter them into the set
-      IPipelineSpecificationWithVersions spec = pipelineConnectionsWithVersions.getSpecificationWithVersions();
-      int outputCount = spec.getOutputCount();
+      IPipelineSpecificationWithVersions fullSpec = pipelineConnectionsWithVersions.getSpecificationWithVersions();
+      IPipelineSpecification pipelineSpec = fullSpec.getPipelineSpecification();
+      IPipelineSpecificationBasic basicSpec = pipelineSpec.getBasicPipelineSpecification();
+      
+      int outputCount = basicSpec.getOutputCount();
       for (int i = 0; i < outputCount; i++)
       {
-        int outputStage = spec.getOutputStage(i);
+        int outputStage = basicSpec.getOutputStage(i);
         
         // Compute whether we need to reindex this record to this output or not, based on spec.
-        String oldDocumentVersion = spec.getOutputDocumentVersionString(i);
-        String oldParameterVersion = spec.getOutputParameterVersionString(i);
-        String oldOutputVersion = spec.getOutputVersionString(i);
-        String oldTransformationVersion = spec.getOutputTransformationVersionString(i);
-        String oldAuthorityName = spec.getAuthorityNameString(i);
+        String oldDocumentVersion = fullSpec.getOutputDocumentVersionString(i);
+        String oldParameterVersion = fullSpec.getOutputParameterVersionString(i);
+        String oldOutputVersion = fullSpec.getOutputVersionString(i);
+        String oldTransformationVersion = fullSpec.getOutputTransformationVersionString(i);
+        String oldAuthorityName = fullSpec.getAuthorityNameString(i);
 
         String newTransformationVersion = null;
         
@@ -2615,23 +2857,23 @@ public class IncrementalIngester extends
         {
           needToReindex = (!oldDocumentVersion.equals(newDocumentVersion) ||
             !oldParameterVersion.equals(newParameterVersion) ||
-            !oldOutputVersion.equals(spec.getStageDescriptionString(outputStage)) ||
+            !oldOutputVersion.equals(pipelineSpec.getStageDescriptionString(outputStage)) ||
             !oldAuthorityName.equals(newAuthorityNameString));
         }
         if (needToReindex == false)
         {
           // Compute the transformation version string
-          newTransformationVersion = computePackedTransformationVersion(spec,outputStage);
+          newTransformationVersion = computePackedTransformationVersion(pipelineSpec,outputStage);
           needToReindex = (!oldTransformationVersion.equals(newTransformationVersion));
         }
 
         int connectionIndex = pipelineConnectionsWithVersions.getOutputConnectionIndex(outputStage).intValue();
         PipelineAddEntryPoint outputStageEntryPoint = new OutputAddEntryPoint(
           outputConnectors[connectionIndex],
-          spec.getStageDescriptionString(outputStage),
-          new OutputActivitiesWrapper(finalActivity,spec.getStageConnectionName(outputStage)),
+          pipelineSpec.getStageDescriptionString(outputStage),
+          new OutputActivitiesWrapper(finalActivity,basicSpec.getStageConnectionName(outputStage)),
           needToReindex,
-          spec.getStageConnectionName(outputStage),
+          basicSpec.getStageConnectionName(outputStage),
           newTransformationVersion,
           ingestTime,
           newDocumentVersion,
@@ -2646,9 +2888,9 @@ public class IncrementalIngester extends
         int[] siblings = null;
         for (Integer outputStage : currentSet.keySet())
         {
-          parent = spec.getStageParent(outputStage.intValue());
+          parent = basicSpec.getStageParent(outputStage.intValue());
           // Look up the children
-          siblings = spec.getStageChildren(parent);
+          siblings = basicSpec.getStageChildren(parent);
           // Are all the siblings in the current set yet?  If not, we can't proceed with this entry.
           boolean skipToNext = false;
           for (int sibling : siblings)
@@ -2680,13 +2922,13 @@ public class IncrementalIngester extends
         // Wrap the entry points in a fan-out class, which has pipe connector-like methods that fire across all the connectors.
         PipelineAddFanout pcf = new PipelineAddFanout(siblingEntryPoints,
           (parent==-1)?null:new TransformationRecordingActivity(finalActivity,
-            spec.getStageConnectionName(parent)),
+            basicSpec.getStageConnectionName(parent)),
           finalActivity);
         if (parent == -1)
           return pcf;
         PipelineAddEntryPoint newEntry = new PipelineAddEntryPoint(
           transformationConnectors[pipelineConnections.getTransformationConnectionIndex(parent).intValue()],
-          spec.getStageDescriptionString(parent),pcf,pcf.checkNeedToReindex());
+          pipelineSpec.getStageDescriptionString(parent),pcf,pcf.checkNeedToReindex());
         currentSet.put(new Integer(parent), newEntry);
       }
 
@@ -3277,7 +3519,16 @@ public class IncrementalIngester extends
       super(outputConnectionName);
       this.outputDescriptionString = outputDescriptionString;
     }
-    
+
+    /** Get the basic pipeline specification.
+    *@return the specification.
+    */
+    @Override
+    public IPipelineSpecificationBasic getBasicPipelineSpecification()
+    {
+      return this;
+    }
+
     /** Get the description string for a pipeline stage.
     *@param stage is the stage to get the connection name for.
     *@return the description string that stage.
@@ -3312,11 +3563,21 @@ public class IncrementalIngester extends
       this.oldTransformationVersion = oldTransformationVersion;
       this.oldAuthorityNameString = oldAuthorityNameString;
     }
-    
+
+    /** Get pipeline specification.
+    *@return the pipeline specification.
+    */
+    @Override
+    public IPipelineSpecification getPipelineSpecification()
+    {
+      return this;
+    }
+
     /** For a given output index, return a document version string.
     *@param index is the output index.
     *@return the document version string.
     */
+    @Override
     public String getOutputDocumentVersionString(int index)
     {
       if (index == 0)
@@ -3328,6 +3589,7 @@ public class IncrementalIngester extends
     *@param index is the output index.
     *@return the parameter version string.
     */
+    @Override
     public String getOutputParameterVersionString(int index)
     {
       if (index == 0)
@@ -3339,6 +3601,7 @@ public class IncrementalIngester extends
     *@param index is the output index.
     *@return the transformation version string.
     */
+    @Override
     public String getOutputTransformationVersionString(int index)
     {
       if (index == 0)
@@ -3350,6 +3613,7 @@ public class IncrementalIngester extends
     *@param index is the output index.
     *@return the output version string.
     */
+    @Override
     public String getOutputVersionString(int index)
     {
       if (index == 0)
@@ -3361,6 +3625,7 @@ public class IncrementalIngester extends
     *@param index is the output index.
     *@return the authority name string.
     */
+    @Override
     public String getAuthorityNameString(int index)
     {
       if (index == 0)
@@ -3388,17 +3653,18 @@ public class IncrementalIngester extends
       throws ManifoldCFException
     {
       this.spec = spec;
+      IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
       // Now, load all the connections we'll ever need, being sure to only load one copy of each.
       // We first segregate them into unique transformation and output connections.
-      int count = spec.getStageCount();
+      int count = basicSpec.getStageCount();
       Set<String> transformations = new HashSet<String>();
       Set<String> outputs = new HashSet<String>();
       for (int i = 0; i < count; i++)
       {
-        if (spec.checkStageOutputConnection(i))
-          outputs.add(spec.getStageConnectionName(i));
+        if (basicSpec.checkStageOutputConnection(i))
+          outputs.add(basicSpec.getStageConnectionName(i));
         else
-          transformations.add(spec.getStageConnectionName(i));
+          transformations.add(basicSpec.getStageConnectionName(i));
       }
       
       Map<String,Integer> transformationNameMap = new HashMap<String,Integer>();
@@ -3424,13 +3690,13 @@ public class IncrementalIngester extends
       for (int i = 0; i < count; i++)
       {
         Integer k;
-        if (spec.checkStageOutputConnection(i))
+        if (basicSpec.checkStageOutputConnection(i))
         {
-          outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(spec.getStageConnectionName(i)));
+          outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(basicSpec.getStageConnectionName(i)));
         }
         else
         {
-          transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(spec.getStageConnectionName(i)));
+          transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(basicSpec.getStageConnectionName(i)));
         }
       }
     }
@@ -3479,7 +3745,7 @@ public class IncrementalIngester extends
     public PipelineConnectionsWithVersions(IPipelineSpecificationWithVersions pipelineSpecificationWithVersions)
       throws ManifoldCFException
     {
-      super(pipelineSpecificationWithVersions);
+      super(pipelineSpecificationWithVersions.getPipelineSpecification());
       this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
     }
     

Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java Sat Jun 14 18:14:42 2014
@@ -38,10 +38,12 @@ public class DocumentIngestStatus
   protected final String documentAuthorityNameString;
 
   /** Constructor */
-  public DocumentIngestStatus(String documentVersionString,
+  public DocumentIngestStatus(
+    String documentVersionString,
     String transformationVersionString, String outputVersionString, String parameterVersionString,
     String documentAuthorityNameString)
   {
+    // Looked-up information
     this.documentVersionString = documentVersionString;
     this.transformationVersionString = transformationVersionString;
     this.outputVersionString = outputVersionString;

Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java Sat Jun 14 18:14:42 2014
@@ -60,6 +60,13 @@ public interface IIncrementalIngester
     throws ManifoldCFException;
 
 
+  /** From a pipeline specification, get the name of the output connection that will be indexed last
+  * in the pipeline.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@return the last indexed output connection name.
+  */
+  public String getLastIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic);
+  
   /** Get an output version string for a document.
   *@param outputConnectionName is the name of the output connection associated with this action.
   *@param spec is the output specification.
@@ -432,6 +439,7 @@ public interface IIncrementalIngester
   *@return the array of document data.  Null will come back for any identifier that doesn't
   * exist in the index.
   */
+  @Deprecated
   public DocumentIngestStatus[] getDocumentIngestDataMultiple(String[] outputConnectionNames,
     String[] identifierClasses, String[] identifierHashes)
     throws ManifoldCFException;
@@ -443,6 +451,7 @@ public interface IIncrementalIngester
   *@return the array of document data.  Null will come back for any identifier that doesn't
   * exist in the index.
   */
+  @Deprecated
   public DocumentIngestStatus[] getDocumentIngestDataMultiple(String outputConnectionName,
     String[] identifierClasses, String[] identifierHashes)
     throws ManifoldCFException;
@@ -453,10 +462,47 @@ public interface IIncrementalIngester
   *@param identifierHash is the hash of the id of the document.
   *@return the current document's ingestion data, or null if the document is not currently ingested.
   */
+  @Deprecated
   public DocumentIngestStatus getDocumentIngestData(String outputConnectionName,
     String identifierClass, String identifierHash)
     throws ManifoldCFException;
 
+  /** Look up ingestion data for a set of documents.
+  *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+  *@param pipelineSpecificationBasics are the pipeline specifications corresponding to the identifier classes and hashes.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is the array of document identifier hashes to look up.
+  */
+  public void getPipelineDocumentIngestDataMultiple(
+    Map<OutputKey,DocumentIngestStatus> rval,
+    IPipelineSpecificationBasic[] pipelineSpecificationBasics,
+    String[] identifierClasses, String[] identifierHashes)
+    throws ManifoldCFException;
+
+  /** Look up ingestion data for a SET of documents.
+  *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+  *@param pipelineSpecificationBasic is the pipeline specification for all documents.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is the array of document identifier hashes to look up.
+  */
+  public void getPipelineDocumentIngestDataMultiple(
+    Map<OutputKey,DocumentIngestStatus> rval,
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String[] identifierClasses, String[] identifierHashes)
+    throws ManifoldCFException;
+
+  /** Look up ingestion data for a document.
+  *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+  *@param pipelineSpecificationBasic is the pipeline specification for the document.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hash of the id of the document.
+  */
+  public void getPipelineDocumentIngestData(
+    Map<OutputKey,DocumentIngestStatus> rval,
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash)
+    throws ManifoldCFException;
+
   /** Calculate the average time interval between changes for a document.
   * This is based on the data gathered for the document.
   *@param outputConnectionName is the name of the output connection associated with this action.
@@ -464,6 +510,7 @@ public interface IIncrementalIngester
   *@param identifierHashes is the hashes of the ids of the documents.
   *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
   */
+  @Deprecated
   public long[] getDocumentUpdateIntervalMultiple(String outputConnectionName,
     String[] identifierClasses, String[] identifierHashes)
     throws ManifoldCFException;
@@ -475,10 +522,35 @@ public interface IIncrementalIngester
   *@param identifierHash is the hash of the id of the document.
   *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
   */
+  @Deprecated
   public long getDocumentUpdateInterval(String outputConnectionName,
     String identifierClass, String identifierHash)
     throws ManifoldCFException;
 
+  /** Calculate the average time interval between changes for a document.
+  * This is based on the data gathered for the document.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+  *@param identifierHashes is the hashes of the ids of the documents.
+  *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+  */
+  public long[] getDocumentUpdateIntervalMultiple(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String[] identifierClasses, String[] identifierHashes)
+    throws ManifoldCFException;
+
+  /** Calculate the average time interval between changes for a document.
+  * This is based on the data gathered for the document.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hash of the id of the document.
+  *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+  */
+  public long getDocumentUpdateInterval(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash)
+    throws ManifoldCFException;
+
   /** Reset all documents belonging to a specific output connection, because we've got information that
   * that system has been reconfigured.  This will force all such documents to be reindexed the next time
   * they are checked.

Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java Sat Jun 14 18:14:42 2014
@@ -23,10 +23,15 @@ import org.apache.manifoldcf.core.interf
 /** This interface describes a multi-output pipeline, where each stage has an already-computed
 * description string.
 */
-public interface IPipelineSpecification extends IPipelineSpecificationBasic
+public interface IPipelineSpecification
 {
   public static final String _rcsid = "@(#)$Id$";
 
+  /** Get the basic pipeline specification.
+  *@return the specification.
+  */
+  public IPipelineSpecificationBasic getBasicPipelineSpecification();
+  
   /** Get the description string for a pipeline stage.
   *@param stage is the stage to get the connection name for.
   *@return the description string that stage.

Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java Sat Jun 14 18:14:42 2014
@@ -23,10 +23,15 @@ import org.apache.manifoldcf.core.interf
 /** This interface describes a multi-output pipeline, with existing document version information from
 * each output.. 
 */
-public interface IPipelineSpecificationWithVersions extends IPipelineSpecification
+public interface IPipelineSpecificationWithVersions
 {
   public static final String _rcsid = "@(#)$Id$";
 
+  /** Get pipeline specification.
+  *@return the pipeline specification.
+  */
+  public IPipelineSpecification getPipelineSpecification();
+  
   /** For a given output index, return a document version string.
   *@param index is the output index.
   *@return the document version string.

Added: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java?rev=1602613&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java (added)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java Sat Jun 14 18:14:42 2014
@@ -0,0 +1,79 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.agents.interfaces;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import java.util.*;
+
+/** This object functions as a key describing a unique output, consisting of:
+* - a document class
+* - a document ID hash
+* - an output connection name
+*/
+public class OutputKey
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  protected final String documentClass;
+  protected final String documentIDHash;
+  protected final String outputConnectionName;
+  
+  /** Constructor */
+  public OutputKey(String documentClass, String documentIDHash, String outputConnectionName)
+  {
+    // Identifying information
+    this.documentClass = documentClass;
+    this.documentIDHash = documentIDHash;
+    this.outputConnectionName = outputConnectionName;
+  }
+
+  /** Get the document class */
+  public String getDocumentClass()
+  {
+    return documentClass;
+  }
+  
+  /** Get the document ID hash */
+  public String getDocumentIDHash()
+  {
+    return documentIDHash;
+  }
+  
+  /** Get the output connection name */
+  public String getOutputConnectionName()
+  {
+    return outputConnectionName;
+  }
+  
+  public int hashCode()
+  {
+    return documentClass.hashCode() + documentIDHash.hashCode() + outputConnectionName.hashCode();
+  }
+  
+  public boolean equals(Object o)
+  {
+    if (!(o instanceof OutputKey))
+      return false;
+    OutputKey dis = (OutputKey)o;
+    return dis.documentClass.equals(documentClass) &&
+      dis.documentIDHash.equals(documentIDHash) &&
+      dis.outputConnectionName.equals(outputConnectionName);
+  }
+      
+}

Propchange: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentDescription.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentDescription.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentDescription.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentDescription.java Sat Jun 14 18:14:42 2014
@@ -27,12 +27,12 @@ public class DocumentDescription
   public static final String _rcsid = "@(#)$Id: DocumentDescription.java 988245 2010-08-23 18:39:35Z kwright $";
 
   // Member variables
-  protected Long id;
-  protected Long jobID;
-  protected String documentIdentifierHash;
-  protected String documentIdentifier;
-  protected long failTime;
-  protected int failRetryCount;
+  protected final Long id;
+  protected final Long jobID;
+  protected final String documentIdentifierHash;
+  protected final String documentIdentifier;
+  protected final long failTime;
+  protected final int failRetryCount;
 
   /** Constructor.
   *@param id is the record id.

Modified: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Sat Jun 14 18:14:42 2014
@@ -110,7 +110,7 @@ public class DocumentCleanupThread exten
 
           IJobDescription job = dds.getJobDescription();
           String connectionName = job.getConnectionName();
-          String outputConnectionName = job.getOutputConnectionName();
+          IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
 
           try
           {
@@ -120,13 +120,12 @@ public class DocumentCleanupThread exten
             IRepositoryConnection connection = connMgr.load(connectionName);
             
             // This is where we store the hopcount cleanup data
-            ArrayList arrayDocHashes = new ArrayList();
-            ArrayList arrayDocsToDelete = new ArrayList();
-            ArrayList arrayRelationshipTypes = new ArrayList();
-            ArrayList hopcountMethods = new ArrayList();
+            List<String> arrayDocHashes = new ArrayList<String>();
+            List<CleanupQueuedDocument> arrayDocsToDelete = new ArrayList<CleanupQueuedDocument>();
+            List<String[]> arrayRelationshipTypes = new ArrayList<String[]>();
+            List<Integer> hopcountMethods = new ArrayList<Integer>();
 
-            int j = 0;
-            while (j < dds.getCount())
+            for (int j = 0; j < dds.getCount(); j++)
             {
               CleanupQueuedDocument dqd = dds.getDocument(j);
               DocumentDescription ddd = dqd.getDocumentDescription();
@@ -142,7 +141,6 @@ public class DocumentCleanupThread exten
                   hopcountMethods.add(new Integer(job.getHopcountMode()));
                 }
               }
-              j++;
             }
 
             // Grab one connection for each connectionName.  If we fail, nothing is lost and retries are possible.
@@ -154,18 +152,16 @@ public class DocumentCleanupThread exten
               boolean[] deleteFromQueue = new boolean[arrayDocHashes.size()];
                     
               // Count the number of docs to actually delete.  This will be a subset of the documents in the list.
-              int k = 0;
               int removeCount = 0;
-              while (k < arrayDocHashes.size())
+              for (int k = 0; k < arrayDocHashes.size(); k++)
               {
-                if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+                if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
                 {
                   deleteFromQueue[k] = false;
                   removeCount++;
                 }
                 else
                   deleteFromQueue[k] = true;
-                k++;
               }
                     
               // Allocate removal arrays
@@ -173,33 +169,29 @@ public class DocumentCleanupThread exten
               String[] hashedDocsToRemove = new String[removeCount];
 
               // Now, iterate over the list
-              k = 0;
               removeCount = 0;
-              while (k < arrayDocHashes.size())
+              for (int k = 0; k < arrayDocHashes.size(); k++)
               {
-                if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+                if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
                 {
                   docClassesToRemove[removeCount] = connectionName;
-                  hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(k);
+                  hashedDocsToRemove[removeCount] = arrayDocHashes.get(k);
                   removeCount++;
                 }
-                k++;
               }
                 
-              OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+              OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr);
 
               // Finally, go ahead and delete the documents from the ingestion system.
 
               try
               {
-                ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
+                ingester.documentDeleteMultiple(pipelineSpecificationBasic,docClassesToRemove,hashedDocsToRemove,activities);
                 // Success!  Label all these as needing deletion from queue.
-                k = 0;
-                while (k < arrayDocHashes.size())
+                for (int k = 0; k < arrayDocHashes.size(); k++)
                 {
-                  if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+                  if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
                     deleteFromQueue[k] = true;
-                  k++;
                 }
               }
               catch (ServiceInterruption e)
@@ -208,10 +200,9 @@ public class DocumentCleanupThread exten
                 // Go through the list of documents we just tried, and reset them on the queue based on the
                 // ServiceInterruption parameters.  Then we must proceed to delete ONLY the documents that
                 // were not part of the index deletion attempt.
-                k = 0;
-                while (k < arrayDocHashes.size())
+                for (int k = 0; k < arrayDocHashes.size(); k++)
                 {
-                  CleanupQueuedDocument cqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+                  CleanupQueuedDocument cqd = arrayDocsToDelete.get(k);
                   if (cqd.shouldBeRemovedFromIndex())
                   {
                     DocumentDescription dd = cqd.getDocumentDescription();
@@ -219,23 +210,21 @@ public class DocumentCleanupThread exten
                     jobManager.resetCleaningDocument(dd,e.getRetryTime());
                     cqd.setProcessed();
                   }
-                  k++;
                 }
               }
 
               // Successfully deleted some documents from ingestion system.  Now, remove them from job queue.  This
               // must currently happen one document at a time, because the jobs and connectors for each document
               // potentially differ.
-              k = 0;
-              while (k < arrayDocHashes.size())
+              for (int k = 0; k < arrayDocHashes.size(); k++)
               {
                 if (deleteFromQueue[k])
                 {
-                  DeleteQueuedDocument dqd = (DeleteQueuedDocument)arrayDocsToDelete.get(k);
+                  CleanupQueuedDocument dqd = arrayDocsToDelete.get(k);
                   DocumentDescription ddd = dqd.getDocumentDescription();
                   Long jobID = ddd.getJobID();
-                  int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
-                  String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
+                  int hopcountMethod = hopcountMethods.get(k).intValue();
+                  String[] legalLinkTypes = arrayRelationshipTypes.get(k);
                   DocumentDescription[] requeueCandidates = jobManager.markDocumentCleanedUp(jobID,legalLinkTypes,ddd,hopcountMethod);
                   // Use the common method for doing the requeuing
                   ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
@@ -243,7 +232,6 @@ public class DocumentCleanupThread exten
                   // Finally, completed expiration of the document.
                   dqd.setProcessed();
                 }
-                k++;
               }
             }
             finally
@@ -349,18 +337,15 @@ public class DocumentCleanupThread exten
   {
 
     // Connection name
-    protected String connectionName;
+    protected final String connectionName;
     // Connection manager
-    protected IRepositoryConnectionManager connMgr;
-    // Output connection name
-    protected String outputConnectionName;
+    protected final IRepositoryConnectionManager connMgr;
 
     /** Constructor */
-    public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr, String outputConnectionName)
+    public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr)
     {
       this.connectionName = connectionName;
       this.connMgr = connMgr;
-      this.outputConnectionName = outputConnectionName;
     }
 
     /** Record time-stamped information about the activity of the output connector.
@@ -382,7 +367,7 @@ public class DocumentCleanupThread exten
       String entityURI, String resultCode, String resultDescription)
       throws ManifoldCFException
     {
-      connMgr.recordHistory(connectionName,startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),dataSize,entityURI,resultCode,
+      connMgr.recordHistory(connectionName,startTime,activityType,dataSize,entityURI,resultCode,
         resultDescription,null);
     }
   }

Modified: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java Sat Jun 14 18:14:42 2014
@@ -100,8 +100,8 @@ public class DocumentDeleteThread extend
           
           IJobDescription job = dds.getJobDescription();
           String connectionName = job.getConnectionName();
-          String outputConnectionName = job.getOutputConnectionName();
-
+          IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
+          
           try
           {
             // Do the delete work.
@@ -115,8 +115,7 @@ public class DocumentDeleteThread extend
             String[] docClassesToRemove = new String[dds.getCount()];
             String[] hashedDocsToRemove = new String[dds.getCount()];
             DeleteQueuedDocument[] docsToDelete = new DeleteQueuedDocument[dds.getCount()];
-            int j = 0;
-            while (j < dds.getCount())
+            for (int j = 0; j < dds.getCount(); j++)
             {
               DeleteQueuedDocument dqd = dds.getDocument(j);
               DocumentDescription ddd = dqd.getDocumentDescription();
@@ -124,19 +123,16 @@ public class DocumentDeleteThread extend
               hashedDocsToRemove[j] = ddd.getDocumentIdentifierHash();
               docsToDelete[j] = dqd;
               deleteFromQueue[j] = false;
-              j++;
             }
                 
-            OutputRemoveActivity logger = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+            OutputRemoveActivity logger = new OutputRemoveActivity(connectionName,connMgr);
                 
             try
             {
-              ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,logger);
-              j = 0;
-              while (j < dds.getCount())
+              ingester.documentDeleteMultiple(pipelineSpecificationBasic,docClassesToRemove,hashedDocsToRemove,logger);
+              for (int j = 0; j < dds.getCount(); j++)
               {
                 deleteFromQueue[j] = true;
-                j++;
               }
             }
             catch (ServiceInterruption e)
@@ -145,46 +141,38 @@ public class DocumentDeleteThread extend
               // Go through the list of documents we just tried, and reset them on the queue based on the
               // ServiceInterruption parameters.  Then we must proceed to delete ONLY the documents that
               // were not part of the index deletion attempt.
-              j = 0;
-              while (j < dds.getCount())
+              for (int j = 0; j < dds.getCount(); j++)
               {
                 DeleteQueuedDocument cqd = docsToDelete[j];
                 DocumentDescription dd = cqd.getDocumentDescription();
                 // To recover from an expiration failure, requeue the document to COMPLETED etc.
                 jobManager.resetDeletingDocument(dd,e.getRetryTime());
                 cqd.setProcessed();
-                j++;
               }
             }
 
             // Count the records we're actually going to delete
             int recordCount = 0;
-            j = 0;
-            while (j < dds.getCount())
+            for (int j = 0; j < dds.getCount(); j++)
             {
               if (deleteFromQueue[j])
                 recordCount++;
-              j++;
             }
                 
             // Delete the records
             DocumentDescription[] deleteDescriptions = new DocumentDescription[recordCount];
-            j = 0;
             recordCount = 0;
-            while (j < dds.getCount())
+            for (int j = 0; j < dds.getCount(); j++)
             {
               if (deleteFromQueue[j])
                 deleteDescriptions[recordCount++] = docsToDelete[j].getDocumentDescription();
-              j++;
             }
             jobManager.deleteIngestedDocumentIdentifiers(deleteDescriptions);
             // Mark them as gone
-            j = 0;
-            while (j < dds.getCount())
+            for (int j = 0; j < dds.getCount(); j++)
             {
               if (deleteFromQueue[j])
                 docsToDelete[j].wasProcessed();
-              j++;
             }
             // Go around again
           }
@@ -193,10 +181,9 @@ public class DocumentDeleteThread extend
             // Here we should take steps to insure that the documents that have been handed to us
             // are dealt with appropriately.  This may involve setting the document state to "complete"
             // so that they will be picked up again.
-            int j = 0;
-            while (j < dds.getCount())
+            for (int j = 0; j < dds.getCount(); j++)
             {
-              DeleteQueuedDocument dqd = dds.getDocument(j++);
+              DeleteQueuedDocument dqd = dds.getDocument(j);
 
               if (dqd.wasProcessed() == false)
               {
@@ -272,19 +259,16 @@ public class DocumentDeleteThread extend
   /** The OutputRemoveActivity class */
   protected static class OutputRemoveActivity implements IOutputRemoveActivity
   {
-    // Connection name
-    protected String connectionName;
     // Connection manager
-    protected IRepositoryConnectionManager connMgr;
+    protected final IRepositoryConnectionManager connMgr;
     // Output connection name
-    protected String outputConnectionName;
+    protected final String connectionName;
 
     /** Constructor */
-    public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr, String outputConnectionName)
+    public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr)
     {
       this.connectionName = connectionName;
       this.connMgr = connMgr;
-      this.outputConnectionName = outputConnectionName;
     }
 
     /** Record time-stamped information about the activity of the output connector.
@@ -306,7 +290,7 @@ public class DocumentDeleteThread extend
       String entityURI, String resultCode, String resultDescription)
       throws ManifoldCFException
     {
-      connMgr.recordHistory(connectionName,startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),dataSize,entityURI,resultCode,
+      connMgr.recordHistory(connectionName,startTime,activityType,dataSize,entityURI,resultCode,
         resultDescription,null);
     }
   }

Modified: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Sat Jun 14 18:14:42 2014
@@ -100,7 +100,7 @@ public class ExpireThread extends Thread
 
           IJobDescription job = dds.getJobDescription();
           String connectionName = job.getConnectionName();
-          String outputConnectionName = job.getOutputConnectionName();
+          IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
           
           try
           {
@@ -112,10 +112,10 @@ public class ExpireThread extends Thread
             IRepositoryConnection connection = connMgr.load(connectionName);
             
             // This is where we store the hopcount cleanup data
-            ArrayList arrayDocHashes = new ArrayList();
-            ArrayList arrayDocsToDelete = new ArrayList();
-            ArrayList arrayRelationshipTypes = new ArrayList();
-            ArrayList hopcountMethods = new ArrayList();
+            List<String> arrayDocHashes = new ArrayList<String>();
+            List<CleanupQueuedDocument> arrayDocsToDelete = new ArrayList<CleanupQueuedDocument>();
+            List<String[]> arrayRelationshipTypes = new ArrayList<String[]>();
+            List<Integer> hopcountMethods = new ArrayList<Integer>();
             
             int j = 0;
             while (j < dds.getCount())
@@ -146,18 +146,16 @@ public class ExpireThread extends Thread
               boolean[] deleteFromQueue = new boolean[arrayDocHashes.size()];
                     
               // Count the number of docs to actually delete.  This will be a subset of the documents in the list.
-              int k = 0;
               int removeCount = 0;
-              while (k < arrayDocHashes.size())
+              for (int k = 0; k < arrayDocHashes.size(); k++)
               {
-                if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+                if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
                 {
                   deleteFromQueue[k] = false;
                   removeCount++;
                 }
                 else
                   deleteFromQueue[k] = true;
-                k++;
               }
                     
               // Allocate removal arrays
@@ -165,33 +163,29 @@ public class ExpireThread extends Thread
               String[] hashedDocsToRemove = new String[removeCount];
 
               // Now, iterate over the list
-              k = 0;
               removeCount = 0;
-              while (k < arrayDocHashes.size())
+              for (int k = 0; k < arrayDocHashes.size(); k++)
               {
-                if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+                if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
                 {
                   docClassesToRemove[removeCount] = connectionName;
-                  hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(k);
+                  hashedDocsToRemove[removeCount] = arrayDocHashes.get(k);
                   removeCount++;
                 }
-                k++;
               }
 
-              OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+              OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr);
 
               // Finally, go ahead and delete the documents from the ingestion system.
               // If we fail, we need to put the documents back on the queue.
               try
               {
-                ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
+                ingester.documentDeleteMultiple(pipelineSpecificationBasic,docClassesToRemove,hashedDocsToRemove,activities);
                 // Success!  Label all these as needing deletion from queue.
-                k = 0;
-                while (k < arrayDocHashes.size())
+                for (int k = 0; k < arrayDocHashes.size(); k++)
                 {
-                  if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+                  if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
                     deleteFromQueue[k] = true;
-                  k++;
                 }
               }
               catch (ServiceInterruption e)
@@ -200,10 +194,9 @@ public class ExpireThread extends Thread
                 // Go through the list of documents we just tried, and reset them on the queue based on the
                 // ServiceInterruption parameters.  Then we must proceed to delete ONLY the documents that
                 // were not part of the index deletion attempt.
-                k = 0;
-                while (k < arrayDocHashes.size())
+                for (int k = 0; k < arrayDocHashes.size(); k++)
                 {
-                  CleanupQueuedDocument cqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+                  CleanupQueuedDocument cqd = arrayDocsToDelete.get(k);
                   if (cqd.shouldBeRemovedFromIndex())
                   {
                     DocumentDescription dd = cqd.getDocumentDescription();
@@ -222,23 +215,21 @@ public class ExpireThread extends Thread
                       cqd.setProcessed();
                     }
                   }
-                  k++;
                 }
               }
 
               // Successfully deleted some documents from ingestion system.  Now, remove them from job queue.  This
               // must currently happen one document at a time, because the jobs and connectors for each document
               // potentially differ.
-              k = 0;
-              while (k < arrayDocHashes.size())
+              for (int k = 0; k < arrayDocHashes.size(); k++)
               {
                 if (deleteFromQueue[k])
                 {
-                  CleanupQueuedDocument dqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+                  CleanupQueuedDocument dqd = arrayDocsToDelete.get(k);
                   DocumentDescription ddd = dqd.getDocumentDescription();
                   Long jobID = ddd.getJobID();
-                  int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
-                  String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
+                  int hopcountMethod = hopcountMethods.get(k).intValue();
+                  String[] legalLinkTypes = arrayRelationshipTypes.get(k);
                   DocumentDescription[] requeueCandidates = jobManager.markDocumentExpired(jobID,legalLinkTypes,ddd,hopcountMethod);
                   // Use the common method for doing the requeuing
                   ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
@@ -246,7 +237,6 @@ public class ExpireThread extends Thread
                   // Finally, completed expiration of the document.
                   dqd.setProcessed();
                 }
-                k++;
               }
             }
             finally
@@ -352,18 +342,15 @@ public class ExpireThread extends Thread
   {
 
     // Connection name
-    protected String connectionName;
+    protected final String connectionName;
     // Connection manager
-    protected IRepositoryConnectionManager connMgr;
-    // Output connection name
-    protected String outputConnectionName;
+    protected final IRepositoryConnectionManager connMgr;
 
     /** Constructor */
-    public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr, String outputConnectionName)
+    public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr)
     {
       this.connectionName = connectionName;
       this.connMgr = connMgr;
-      this.outputConnectionName = outputConnectionName;
     }
 
     /** Record time-stamped information about the activity of the output connector.
@@ -385,7 +372,7 @@ public class ExpireThread extends Thread
       String entityURI, String resultCode, String resultDescription)
       throws ManifoldCFException
     {
-      connMgr.recordHistory(connectionName,startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),dataSize,entityURI,resultCode,
+      connMgr.recordHistory(connectionName,startTime,activityType,dataSize,entityURI,resultCode,
         resultDescription,null);
     }
   }

Added: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java?rev=1602613&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java (added)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java Sat Jun 14 18:14:42 2014
@@ -0,0 +1,71 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+
+/** Class which handles pipeline specifications that include current (new) description strings.
+*/
+public class PipelineSpecification implements IPipelineSpecification
+{
+  protected final IPipelineSpecificationBasic basicSpecification;
+  protected final String[] pipelineDescriptionStrings;
+    
+  public PipelineSpecification(IPipelineSpecificationBasic basicSpecification, IJobDescription job, IIncrementalIngester ingester)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    this.basicSpecification = basicSpecification;
+    this.pipelineDescriptionStrings = new String[basicSpecification.getStageCount()];
+    for (int i = 0; i < pipelineDescriptionStrings.length; i++)
+    {
+      // Note: this needs to change when output connections become part of the pipeline
+      String descriptionString;
+      if (basicSpecification.checkStageOutputConnection(i))
+      {
+        descriptionString = ingester.getOutputDescription(basicSpecification.getStageConnectionName(i),job.getOutputSpecification());
+      }
+      else
+      {
+        descriptionString = ingester.getTransformationDescription(basicSpecification.getStageConnectionName(i),job.getPipelineStageSpecification(i));
+      }
+      this.pipelineDescriptionStrings[i] = descriptionString;
+    }
+  }
+
+  /** Get the basic pipeline specification.
+  *@return the specification.
+  */
+  @Override
+  public IPipelineSpecificationBasic getBasicPipelineSpecification()
+  {
+    return basicSpecification;
+  }
+
+  /** Get the description string for a pipeline stage.
+  *@param stage is the stage to get the connection name for.
+  *@return the description string that stage.
+  */
+  @Override
+  public String getStageDescriptionString(int stage)
+  {
+    return pipelineDescriptionStrings[stage];
+  }
+}