You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/07/20 17:20:43 UTC

svn commit: r1612102 - in /manifoldcf/trunk: ./ framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ framework/pull-agent/src/main/java/org/apache/manifol...

Author: kwright
Date: Sun Jul 20 15:20:42 2014
New Revision: 1612102

URL: http://svn.apache.org/r1612102
Log:
Fix for CONNECTORS-989.  Introduce component document concept.

Added:
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatusSet.java
      - copied unchanged from r1612101, manifoldcf/branches/CONNECTORS-989/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatusSet.java
Modified:
    manifoldcf/trunk/   (props changed)
    manifoldcf/trunk/CHANGES.txt
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
    manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IngestStatuses.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IExistingVersions.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/QueuedDocument.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
    manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
  Merged /manifoldcf/branches/CONNECTORS-989:r1611600-1612101

Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1612102&r1=1612101&r2=1612102&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Sun Jul 20 15:20:42 2014
@@ -3,6 +3,11 @@ $Id$
 
 ======================= 1.7-dev =====================
 
+CONNECTORS-989: Introduce document sub-components, which
+is a way of having multiple indexed documents corresponding to a
+single repository document.
+(Matteo Grolla, Karl Wright)
+
 CONNECTORS-994: Make Alfresco connector pay attention to the
 scanOnly flag.
 (Prasad Perera, Karl Wright)

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1612102&r1=1612101&r2=1612102&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Sun Jul 20 15:20:42 2014
@@ -64,6 +64,7 @@ public class IncrementalIngester extends
   protected final static String idField = "id";
   protected final static String outputConnNameField = "connectionname";
   protected final static String docKeyField = "dockey";
+  protected final static String componentHashField = "componenthash";
   protected final static String docURIField = "docuri";
   protected final static String uriHashField = "urihash";
   protected final static String lastVersionField = "lastversion";
@@ -122,6 +123,7 @@ public class IncrementalIngester extends
         map.put(idField,new ColumnDescription("BIGINT",true,false,null,null,false));
         map.put(outputConnNameField,new ColumnDescription("VARCHAR(32)",false,false,outputConnectionTableName,outputConnectionNameField,false));
         map.put(docKeyField,new ColumnDescription("VARCHAR(73)",false,false,null,null,false));
+        map.put(componentHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
         // The document URI field, if null, indicates that the document was not actually ingested!
         // This happens when a connector wishes to keep track of a version string, but not actually ingest the doc.
         map.put(docURIField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
@@ -156,10 +158,18 @@ public class IncrementalIngester extends
           performAlter(addMap,null,null,null);
         }
 
+        cd = (ColumnDescription)existing.get(componentHashField);
+        if (cd == null)
+        {
+          Map<String,ColumnDescription> addMap = new HashMap<String,ColumnDescription>();
+          addMap.put(componentHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
+          performAlter(addMap,null,null,null);
+        }
+
       }
 
       // Now, do indexes
-      IndexDescription keyIndex = new IndexDescription(true,new String[]{docKeyField,outputConnNameField});
+      IndexDescription keyIndex = new IndexDescription(true,new String[]{docKeyField,outputConnNameField,componentHashField});
       IndexDescription uriHashIndex = new IndexDescription(false,new String[]{uriHashField,outputConnNameField});
       IndexDescription outputConnIndex = new IndexDescription(false,new String[]{outputConnNameField});
 
@@ -228,7 +238,20 @@ public class IncrementalIngester extends
     int count = pipelineSpecificationBasic.getOutputCount();
     if (count == 0)
       return null;
-    return pipelineSpecificationBasic.getStageConnectionName(count-1);
+    return pipelineSpecificationBasic.getStageConnectionName(pipelineSpecificationBasic.getOutputStage(count-1));
+  }
+
+  /** From a pipeline specification, get the name of the output connection that will be indexed first
+  * in the pipeline.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@return the first indexed output connection name.
+  */
+  @Override
+  public String getFirstIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic)
+  {
+    if (pipelineSpecificationBasic.getOutputCount() == 0)
+      return null;
+    return pipelineSpecificationBasic.getStageConnectionName(pipelineSpecificationBasic.getOutputStage(0));
   }
 
   /** Check if a mime type is indexable.
@@ -612,13 +635,14 @@ public class IncrementalIngester extends
   *@param pipelineSpecificationBasic is the basic pipeline specification needed.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
+  *@param componentHash is the hashed component identifier, if any.
   *@param documentVersion is the document version.
   *@param recordTime is the time at which the recording took place, in milliseconds since epoch.
   */
   @Override
   public void documentRecord(
     IPipelineSpecificationBasic pipelineSpecificationBasic,
-    String identifierClass, String identifierHash,
+    String identifierClass, String identifierHash, String componentHash,
     String documentVersion, long recordTime)
     throws ManifoldCFException
   {
@@ -640,7 +664,7 @@ public class IncrementalIngester extends
 
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Recording document '"+docKey+"' for output connections '"+outputConnectionNames+"'");
+      Logging.ingest.debug("Recording document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" for output connections '"+outputConnectionNames+"'");
     }
 
     for (int k = 0; k < outputConnectionNames.length; k++)
@@ -649,7 +673,7 @@ public class IncrementalIngester extends
 
       // If we get here, it means we are noting that the document was examined, but that no change was required.  This is signaled
       // to noteDocumentIngest by having the null documentURI.
-      noteDocumentIngest(outputConnectionName,docKey,documentVersion,null,null,null,null,recordTime,null,null);
+      noteDocumentIngest(outputConnectionName,docKey,componentHash,documentVersion,null,null,null,null,recordTime,null,null);
     }
   }
 
@@ -661,6 +685,7 @@ public class IncrementalIngester extends
   *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
+  *@param componentHash is the hashed component identifier, if any.
   *@param documentVersion is the document version.
   *@param parameterVersion is the version string for the forced parameters.
   *@param authorityName is the name of the authority associated with the document, if any.
@@ -670,7 +695,7 @@ public class IncrementalIngester extends
   @Override
   public void documentNoData(
     IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
-    String identifierClass, String identifierHash,
+    String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String parameterVersion,
     String authorityName,
@@ -684,7 +709,7 @@ public class IncrementalIngester extends
 
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Logging empty document '"+docKey+"' into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+      Logging.ingest.debug("Logging empty document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
     }
 
     // Set up a pipeline
@@ -694,7 +719,7 @@ public class IncrementalIngester extends
       throw new ServiceInterruption("Pipeline connector not installed",0L);
     try
     {
-      pipeline.noDocument(docKey,documentVersion,parameterVersion,authorityName,activities,recordTime);
+      pipeline.noDocument(docKey,componentHash,documentVersion,parameterVersion,authorityName,activities,recordTime);
     }
     finally
     {
@@ -710,6 +735,7 @@ public class IncrementalIngester extends
   *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
+  *@param componentHash is the hashed component identifier, if any.
   *@param documentVersion is the document version.
   *@param parameterVersion is the version string for the forced parameters.
   *@param authorityName is the name of the authority associated with the document, if any.
@@ -723,11 +749,11 @@ public class IncrementalIngester extends
   @Override
   public boolean documentIngest(
     IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
-    String identifierClass, String identifierHash,
+    String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String parameterVersion,
     String authorityName,
-    RepositoryDocument document,
+    RepositoryDocument data,
     long ingestTime, String documentURI,
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption, IOException
@@ -738,11 +764,11 @@ public class IncrementalIngester extends
 
     if (Logging.ingest.isDebugEnabled())
     {
-      Logging.ingest.debug("Ingesting document '"+docKey+"' into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+      Logging.ingest.debug("Ingesting document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
     }
 
     // Set indexing date
-    document.setIndexingDate(new Date());
+    data.setIndexingDate(new Date());
     
     // Set up a pipeline
     PipelineObjectWithVersions pipeline = pipelineGrabWithVersions(pipelineConnectionsWithVersions);
@@ -751,7 +777,7 @@ public class IncrementalIngester extends
       throw new ServiceInterruption("Pipeline connector not installed",0L);
     try
     {
-      return pipeline.addOrReplaceDocumentWithException(docKey,documentURI,document,documentVersion,parameterVersion,authorityName,activities,ingestTime) == IPipelineConnector.DOCUMENTSTATUS_ACCEPTED;
+      return pipeline.addOrReplaceDocumentWithException(docKey,componentHash,documentURI,data,documentVersion,parameterVersion,authorityName,activities,ingestTime) == IPipelineConnector.DOCUMENTSTATUS_ACCEPTED;
     }
     finally
     {
@@ -759,6 +785,27 @@ public class IncrementalIngester extends
     }
   }
 
+  /** Remove a document component from the search engine index.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hash of the id of the document.
+  *@param componentHash is the hashed component identifier, if any.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  @Override
+  public void documentRemove(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash, String componentHash,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    documentRemoveMultiple(pipelineSpecificationBasic,
+      new String[]{identifierClass},
+      new String[]{identifierHash},
+      componentHash,
+      activities);
+  }
+
   protected static String[] extractOutputConnectionNames(IPipelineSpecificationBasic pipelineSpecificationBasic)
   {
     String[] rval = new String[pipelineSpecificationBasic.getOutputCount()];
@@ -1151,6 +1198,218 @@ public class IncrementalIngester extends
     }
   }
 
+  /** Remove multiple document components from the search engine index.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClasses are the names of the spaces in which the identifier hash should be interpreted.
+  *@param identifierHashes are the hashes of the ids of the documents.
+  *@param componentHash is the hashed component identifier, if any.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  @Override
+  public void documentRemoveMultiple(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String[] identifierClasses, String[] identifierHashes, String componentHash,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption
+  {
+    String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+    // Load connection managers up front to save time
+    IOutputConnection[] outputConnections = connectionManager.loadMultiple(outputConnectionNames);
+    
+    // No transactions here, so we can cycle through the connection names one at a time
+    for (int z = 0; z < outputConnectionNames.length; z++)
+    {
+      String outputConnectionName = outputConnectionNames[z];
+      IOutputConnection connection = outputConnections[z];
+
+      activities = new OutputRemoveActivitiesWrapper(activities,outputConnectionName);
+
+      if (Logging.ingest.isDebugEnabled())
+      {
+        for (int i = 0; i < identifierHashes.length; i++)
+        {
+          Logging.ingest.debug("Request to remove document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" from output connection '"+outputConnectionName+"'");
+        }
+      }
+
+      // No transactions.  Time for the operation may exceed transaction timeout.
+
+      // Obtain the current URIs of all of these.
+      DeleteInfo[] uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes,componentHash);
+
+      // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting.
+      // (This guarantees that when this operation is complete the database reflects reality.)
+      int validURIcount = 0;
+      for (int i = 0; i < uris.length; i++)
+      {
+        if (uris[i] != null && uris[i].getURI() != null)
+          validURIcount++;
+      }
+      String[] lockArray = new String[validURIcount];
+      String[] validURIArray = new String[validURIcount];
+      validURIcount = 0;
+      for (int i = 0; i < uris.length; i++)
+      {
+        if (uris[i] != null && uris[i].getURI() != null)
+        {
+          validURIArray[validURIcount] = uris[i].getURI();
+          lockArray[validURIcount] = outputConnectionName+":"+validURIArray[validURIcount];
+          validURIcount++;
+        }
+      }
+
+      lockManager.enterLocks(null,null,lockArray);
+      try
+      {
+        // Fetch the document URIs for the listed documents
+        for (int i = 0; i < uris.length; i++)
+        {
+          if (uris[i] != null && uris[i].getURI() != null)
+            removeDocument(connection,uris[i].getURI(),uris[i].getOutputVersion(),activities);
+        }
+
+        // Now, get rid of all rows that match the given uris.
+        // Do the queries together, then the deletes
+        beginTransaction();
+        try
+        {
+          // The basic process is this:
+          // 1) Come up with a set of urihash values
+          // 2) Find the matching, corresponding id values
+          // 3) Delete the rows corresponding to the id values, in sequence
+
+          // Process (1 & 2) has to be broken down into chunks that contain the maximum
+          // number of doc hash values each.  We need to avoid repeating doc hash values,
+          // so the first step is to come up with ALL the doc hash values before looping
+          // over them.
+
+          int maxClauses;
+          
+          // Find all the documents that match this set of URIs
+          Set<String> docURIHashValues = new HashSet<String>();
+          Set<String> docURIValues = new HashSet<String>();
+          for (String docDBString : validURIArray)
+          {
+            String docDBHashString = ManifoldCF.hash(docDBString);
+            docURIValues.add(docDBString);
+            docURIHashValues.add(docDBHashString);
+          }
+
+          // Now, perform n queries, each of them no larger the maxInClause in length.
+          // Create a list of row id's from this.
+          Set<Long> rowIDSet = new HashSet<Long>();
+          Iterator<String> iter = docURIHashValues.iterator();
+          int j = 0;
+          List<String> hashList = new ArrayList<String>();
+          maxClauses = maxClausesRowIdsForURIs(outputConnectionName);
+          while (iter.hasNext())
+          {
+            if (j == maxClauses)
+            {
+              findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+              hashList.clear();
+              j = 0;
+            }
+            hashList.add(iter.next());
+            j++;
+          }
+
+          if (j > 0)
+            findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+
+          // Next, go through the list of row IDs, and delete them in chunks
+          j = 0;
+          List<Long> list = new ArrayList<Long>();
+          Iterator<Long> iter2 = rowIDSet.iterator();
+          maxClauses = maxClausesDeleteRowIds();
+          while (iter2.hasNext())
+          {
+            if (j == maxClauses)
+            {
+              deleteRowIds(list);
+              list.clear();
+              j = 0;
+            }
+            list.add(iter2.next());
+            j++;
+          }
+
+          if (j > 0)
+            deleteRowIds(list);
+
+          // Now, find the set of documents that remain that match the document identifiers.
+          Set<String> docIdValues = new HashSet<String>();
+          for (int i = 0; i < identifierHashes.length; i++)
+          {
+            String docDBString = makeKey(identifierClasses[i],identifierHashes[i]);
+            docIdValues.add(docDBString);
+          }
+
+          // Now, perform n queries, each of them no larger the maxInClause in length.
+          // Create a list of row id's from this.
+          rowIDSet.clear();
+          iter = docIdValues.iterator();
+          j = 0;
+          List<String> list2 = new ArrayList<String>();
+          maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
+          while (iter.hasNext())
+          {
+            if (j == maxClauses)
+            {
+              findRowIdsForDocIds(outputConnectionName,rowIDSet,list2);
+              list2.clear();
+              j = 0;
+            }
+            list2.add(iter.next());
+            j++;
+          }
+
+          if (j > 0)
+            findRowIdsForDocIds(outputConnectionName,rowIDSet,list2);
+
+          // Next, go through the list of row IDs, and delete them in chunks
+          j = 0;
+          list.clear();
+          iter2 = rowIDSet.iterator();
+          maxClauses = maxClausesDeleteRowIds();
+          while (iter2.hasNext())
+          {
+            if (j == maxClauses)
+            {
+              deleteRowIds(list);
+              list.clear();
+              j = 0;
+            }
+            list.add(iter2.next());
+            j++;
+          }
+
+          if (j > 0)
+            deleteRowIds(list);
+
+        }
+        catch (ManifoldCFException e)
+        {
+          signalRollback();
+          throw e;
+        }
+        catch (Error e)
+        {
+          signalRollback();
+          throw e;
+        }
+        finally
+        {
+          endTransaction();
+        }
+      }
+      finally
+      {
+        lockManager.leaveLocks(null,null,lockArray);
+      }
+    }
+  }
+
   /** Calculate the clauses.
   */
   protected int maxClausesRowIdsForURIs(String outputConnectionName)
@@ -1336,6 +1595,63 @@ public class IncrementalIngester extends
     }
   }
 
+  /** Find out what URIs a SET of document URIs are currently ingested.
+  *@param outputConnectionName is the output connection name.
+  *@param identifierClasses is the array of identifier classes.
+  *@param identifierHashes is the array of document id's to check.
+  *@param componentHash is the component hash to check.
+  *@return the array of current document uri's.  Null returned for identifiers
+  * that don't exist in the index.
+  */
+  protected DeleteInfo[] getDocumentURIMultiple(String outputConnectionName, String[] identifierClasses, String[] identifierHashes, String componentHash)
+    throws ManifoldCFException
+  {
+    DeleteInfo[] rval = new DeleteInfo[identifierHashes.length];
+    Map<String,Integer> map = new HashMap<String,Integer>();
+    for (int i = 0; i < identifierHashes.length; i++)
+    {
+      map.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
+      rval[i] = null;
+    }
+
+    beginTransaction();
+    try
+    {
+      List<String> list = new ArrayList<String>();
+      int maxCount = maxClauseDocumentURIChunk(outputConnectionName,componentHash);
+      int j = 0;
+      Iterator<String> iter = map.keySet().iterator();
+      while (iter.hasNext())
+      {
+        if (j == maxCount)
+        {
+          getDocumentURIChunk(rval,map,outputConnectionName,list,componentHash);
+          j = 0;
+          list.clear();
+        }
+        list.add(iter.next());
+        j++;
+      }
+      if (j > 0)
+        getDocumentURIChunk(rval,map,outputConnectionName,list,componentHash);
+      return rval;
+    }
+    catch (ManifoldCFException e)
+    {
+      signalRollback();
+      throw e;
+    }
+    catch (Error e)
+    {
+      signalRollback();
+      throw e;
+    }
+    finally
+    {
+      endTransaction();
+    }
+  }
+
   /** Look up ingestion data for a set of documents.
   *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
   *@param pipelineSpecificationBasics are the pipeline specifications corresponding to the identifier classes and hashes.
@@ -1457,7 +1773,7 @@ public class IncrementalIngester extends
       new MultiClause(outputConnNameField,outputConnectionNames)});
       
     // Get the primary records associated with this hash value
-    IResultSet set = performQuery("SELECT "+idField+","+outputConnNameField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+","+lastTransformationVersionField+
+    IResultSet set = performQuery("SELECT "+idField+","+outputConnNameField+","+docKeyField+","+componentHashField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+","+lastTransformationVersionField+
       " FROM "+getTableName()+" WHERE "+query,newList,null,null);
 
     // Now, go through the original request once more, this time building the result
@@ -1470,6 +1786,7 @@ public class IncrementalIngester extends
       {
         Long id = (Long)row.getValue(idField);
         String outputConnectionName = (String)row.getValue(outputConnNameField);
+        String componentHash = (String)row.getValue(componentHashField);
         String lastVersion = (String)row.getValue(lastVersionField);
         if (lastVersion == null)
           lastVersion = "";
@@ -1487,7 +1804,7 @@ public class IncrementalIngester extends
           authorityName = "";
         int indexValue = position.intValue();
         rval.addStatus(identifierClasses[indexValue],identifierHashes[indexValue],outputConnectionName,
-          new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName));
+          componentHash,new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName));
       }
     }
   }
@@ -1688,6 +2005,7 @@ public class IncrementalIngester extends
   /** Note the ingestion of a document, or the "update" of a document.
   *@param outputConnectionName is the name of the output connection.
   *@param docKey is the key string describing the document.
+  *@param componentHash is the component identifier hash for this document.
   *@param documentVersion is a string describing the new version of the document.
   *@param transformationVersion is a string describing all current transformations for the document.
   *@param outputVersion is the version string calculated for the output connection.
@@ -1699,7 +2017,7 @@ public class IncrementalIngester extends
   *@param documentURIHash is the hash of the document uri.
   */
   protected void noteDocumentIngest(String outputConnectionName,
-    String docKey, String documentVersion, String transformationVersion,
+    String docKey, String componentHash, String documentVersion, String transformationVersion,
     String outputVersion, String packedForcedParameters,
     String authorityNameString,
     long ingestTime, String documentURI, String documentURIHash)
@@ -1728,6 +2046,8 @@ public class IncrementalIngester extends
 
       // Try the update first.  Typically this succeeds except in the case where a doc is indexed for the first time.
       map.clear();
+      if (componentHash != null)
+        map.put(componentHashField,componentHash);
       map.put(lastVersionField,documentVersion);
       map.put(lastTransformationVersionField,transformationVersion);
       map.put(lastOutputVersionField,outputVersion);
@@ -1755,7 +2075,8 @@ public class IncrementalIngester extends
           ArrayList list = new ArrayList();
           String query = buildConjunctionClause(list,new ClauseDescription[]{
             new UnitaryClause(docKeyField,docKey),
-            new UnitaryClause(outputConnNameField,outputConnectionName)});
+            new UnitaryClause(outputConnNameField,outputConnectionName),
+            ((componentHash==null)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash))});
           IResultSet set = performQuery("SELECT "+idField+","+changeCountField+" FROM "+getTableName()+" WHERE "+
             query+" FOR UPDATE",list,null,null);
           IResultRow row = null;
@@ -1807,6 +2128,8 @@ public class IncrementalIngester extends
 
       // Set up for insert
       map.clear();
+      if (componentHash != null)
+        map.put(componentHashField,componentHash);
       map.put(lastVersionField,documentVersion);
       map.put(lastTransformationVersionField,transformationVersion);
       map.put(lastOutputVersionField,outputVersion);
@@ -1870,9 +2193,10 @@ public class IncrementalIngester extends
   *@param rval is the string array where the uris should be put.
   *@param map is the map from id to index.
   *@param clause is the in clause for the query.
-  *@param list is the parameter list for the query.
+  *@param list are the doc keys for the query.
   */
-  protected void getDocumentURIChunk(DeleteInfo[] rval, Map<String,Integer> map, String outputConnectionName, List<String> list)
+  protected void getDocumentURIChunk(DeleteInfo[] rval, Map<String,Integer> map, String outputConnectionName,
+    List<String> list)
     throws ManifoldCFException
   {
     ArrayList newList = new ArrayList();
@@ -1900,6 +2224,52 @@ public class IncrementalIngester extends
     }
   }
 
+  /** Calculate how many clauses at a time
+  */
+  protected int maxClauseDocumentURIChunk(String outputConnectionName, String componentHash)
+  {
+    return findConjunctionClauseMax(new ClauseDescription[]{
+      new UnitaryClause(outputConnNameField,outputConnectionName),
+      ((componentHash==null)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash))});
+  }
+
+  /** Get a chunk of document uris.
+  *@param rval is the string array where the uris should be put.
+  *@param map is the map from id to index.
+  *@param clause is the in clause for the query.
+  *@param list are the doc keys for the query.
+  *@param componentHash is the component hash, if any, for the query.
+  */
+  protected void getDocumentURIChunk(DeleteInfo[] rval, Map<String,Integer> map, String outputConnectionName,
+    List<String> list, String componentHash)
+    throws ManifoldCFException
+  {
+    ArrayList newList = new ArrayList();
+    String query = buildConjunctionClause(newList,new ClauseDescription[]{
+      new MultiClause(docKeyField,list),
+      new UnitaryClause(outputConnNameField,outputConnectionName),
+      ((componentHash==null)?new NullCheckClause(componentHashField,true):new UnitaryClause(componentHashField,componentHash))});
+      
+    IResultSet set = performQuery("SELECT "+docKeyField+","+docURIField+","+lastOutputVersionField+" FROM "+getTableName()+" WHERE "+
+      query,newList,null,null);
+
+    // Go through list and put into buckets.
+    for (int i = 0; i < set.getRowCount(); i++)
+    {
+      IResultRow row = set.getRow(i);
+      String docHash = row.getValue(docKeyField).toString();
+      Integer position = (Integer)map.get(docHash);
+      if (position != null)
+      {
+        String lastURI = (String)row.getValue(docURIField);
+        if (lastURI != null && lastURI.length() == 0)
+          lastURI = null;
+        String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
+        rval[position.intValue()] = new DeleteInfo(lastURI,lastOutputVersion);
+      }
+    }
+  }
+
   /** Count the clauses
   */
   protected int maxClauseDocumentIngestDataChunk(String outputConnectionName)
@@ -2302,23 +2672,23 @@ public class IncrementalIngester extends
       this.pipelineConnectionsWithVersions = pipelineConnectionsWithVersions;
     }
 
-    public int addOrReplaceDocumentWithException(String docKey, String documentURI, RepositoryDocument document, String newDocumentVersion, String newParameterVersion, String authorityNameString, IOutputActivity finalActivity, long ingestTime)
+    public int addOrReplaceDocumentWithException(String docKey, String componentHash, String documentURI, RepositoryDocument document, String newDocumentVersion, String newParameterVersion, String authorityNameString, IOutputActivity finalActivity, long ingestTime)
       throws ManifoldCFException, ServiceInterruption, IOException
     {
-      PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,newParameterVersion,authorityNameString,ingestTime,docKey);
+      PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,newParameterVersion,authorityNameString,ingestTime,docKey,componentHash);
       return entryPoint.sendDocument(documentURI,document);
     }
 
-    public void noDocument(String docKey, String newDocumentVersion, String newParameterVersion, String authorityNameString, IOutputActivity finalActivity, long ingestTime)
+    public void noDocument(String docKey, String componentHash, String newDocumentVersion, String newParameterVersion, String authorityNameString, IOutputActivity finalActivity, long ingestTime)
       throws ManifoldCFException, ServiceInterruption
     {
-      PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,newParameterVersion,authorityNameString,ingestTime,docKey);
+      PipelineAddFanout entryPoint = buildAddPipeline(finalActivity,newDocumentVersion,newParameterVersion,authorityNameString,ingestTime,docKey,componentHash);
       entryPoint.noDocument();
     }
 
     protected PipelineAddFanout buildAddPipeline(IOutputActivity finalActivity,
       String newDocumentVersion, String newParameterVersion, String newAuthorityNameString,
-      long ingestTime, String docKey)
+      long ingestTime, String docKey, String componentHash)
     {
       // Algorithm for building a pipeline:
       // (1) We start with the set of final output connection stages, and build an entry point for each one.  That's our "current set".
@@ -2374,6 +2744,7 @@ public class IncrementalIngester extends
           newDocumentVersion,
           newParameterVersion,
           docKey,
+          componentHash,
           newAuthorityNameString);
         currentSet.put(new Integer(outputStage), outputStageEntryPoint);
       }
@@ -2807,6 +3178,7 @@ public class IncrementalIngester extends
     protected final String documentVersion;
     protected final String parameterVersion;
     protected final String docKey;
+    protected final String componentHash;
     protected final IOutputActivity activity;
     
     public OutputAddEntryPoint(IOutputConnector outputConnector,
@@ -2819,6 +3191,7 @@ public class IncrementalIngester extends
       String documentVersion,
       String parameterVersion,
       String docKey,
+      String componentHash,
       String authorityNameString)
     {
       super(outputConnector,outputDescriptionString,authorityNameString,activity,isActive);
@@ -2829,6 +3202,7 @@ public class IncrementalIngester extends
       this.documentVersion = documentVersion;
       this.parameterVersion = parameterVersion;
       this.docKey = docKey;
+      this.componentHash = componentHash;
       this.activity = activity;
     }
     
@@ -2958,15 +3332,15 @@ public class IncrementalIngester extends
           // This is a marker that says "something is there"; it has an empty version, which indicates
           // that we don't know anything about it.  That means it will be reingested when the
           // next version comes along, and will be deleted if called for also.
-          noteDocumentIngest(outputConnectionName,docKey,null,null,null,null,null,ingestTime,documentURI,documentURIHash);
+          noteDocumentIngest(outputConnectionName,docKey,componentHash,null,null,null,null,null,ingestTime,documentURI,documentURIHash);
           int result = super.addOrReplaceDocumentWithException(documentURI, document);
-          noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion,pipelineDescriptionString.getVersionString(),parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
+          noteDocumentIngest(outputConnectionName,docKey,componentHash,documentVersion,transformationVersion,pipelineDescriptionString.getVersionString(),parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
           return result;
         }
 
         // If we get here, it means we are noting that the document was examined, but that no change was required.  This is signaled
         // to noteDocumentIngest by having the null documentURI.
-        noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion,pipelineDescriptionString.getVersionString(),parameterVersion,authorityNameString,ingestTime,null,null);
+        noteDocumentIngest(outputConnectionName,docKey,componentHash,documentVersion,transformationVersion,pipelineDescriptionString.getVersionString(),parameterVersion,authorityNameString,ingestTime,null,null);
         return IPipelineConnector.DOCUMENTSTATUS_ACCEPTED;
       }
       finally

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1612102&r1=1612101&r2=1612102&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java Sun Jul 20 15:20:42 2014
@@ -66,7 +66,14 @@ public interface IIncrementalIngester
   *@return the last indexed output connection name.
   */
   public String getLastIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic);
-  
+
+  /** From a pipeline specification, get the name of the output connection that will be indexed first
+  * in the pipeline.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@return the first indexed output connection name.
+  */
+  public String getFirstIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic);
+
   /** Get an output version string for a document.
   *@param outputConnectionName is the name of the output connection associated with this action.
   *@param spec is the output specification.
@@ -154,12 +161,13 @@ public interface IIncrementalIngester
   *@param pipelineSpecificationBasic is the basic pipeline specification needed.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
+  *@param componentHash is the hashed component identifier, if any.
   *@param documentVersion is the document version.
   *@param recordTime is the time at which the recording took place, in milliseconds since epoch.
   */
   public void documentRecord(
     IPipelineSpecificationBasic pipelineSpecificationBasic,
-    String identifierClass, String identifierHash,
+    String identifierClass, String identifierHash, String componentHash,
     String documentVersion, long recordTime)
     throws ManifoldCFException;
 
@@ -171,6 +179,7 @@ public interface IIncrementalIngester
   *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
+  *@param componentHash is the hashed component identifier, if any.
   *@param documentVersion is the document version.
   *@param parameterVersion is the version string for the forced parameters.
   *@param authorityName is the name of the authority associated with the document, if any.
@@ -179,7 +188,7 @@ public interface IIncrementalIngester
   */
   public void documentNoData(
     IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
-    String identifierClass, String identifierHash,
+    String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String parameterVersion,
     String authorityName,
@@ -195,6 +204,7 @@ public interface IIncrementalIngester
   *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hashed document identifier.
+  *@param componentHash is the hashed component identifier, if any.
   *@param documentVersion is the document version.
   *@param parameterVersion is the version string for the forced parameters.
   *@param authorityName is the name of the authority associated with the document, if any.
@@ -207,7 +217,7 @@ public interface IIncrementalIngester
   */
   public boolean documentIngest(
     IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
-    String identifierClass, String identifierHash,
+    String identifierClass, String identifierHash, String componentHash,
     String documentVersion,
     String parameterVersion,
     String authorityName,
@@ -216,6 +226,32 @@ public interface IIncrementalIngester
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption, IOException;
 
+  /** Remove a document component from the search engine index.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+  *@param identifierHash is the hash of the id of the document.
+  *@param componentHash is the hashed component identifier, if any.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  public void documentRemove(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String identifierClass, String identifierHash, String componentHash,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption;
+
+  /** Remove multiple document components from the search engine index.
+  *@param pipelineSpecificationBasic is the basic pipeline specification.
+  *@param identifierClasses are the names of the spaces in which the identifier hash should be interpreted.
+  *@param identifierHashes are the hashes of the ids of the documents.
+  *@param componentHash is the hashed component identifier, if any.
+  *@param activities is the object to use to log the details of the ingestion attempt.  May be null.
+  */
+  public void documentRemoveMultiple(
+    IPipelineSpecificationBasic pipelineSpecificationBasic,
+    String[] identifierClasses, String[] identifierHashes, String componentHash,
+    IOutputRemoveActivity activities)
+    throws ManifoldCFException, ServiceInterruption;
+
   /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
   * versions agreed).
   *@param pipelineSpecificationBasic is a pipeline specification.
@@ -242,7 +278,7 @@ public interface IIncrementalIngester
     long checkTime)
     throws ManifoldCFException;
 
-  /** Delete multiple documents from the search engine index.
+  /** Delete multiple documents, and their components, from the search engine index.
   *@param pipelineSpecificationBasics are the pipeline specifications associated with the documents.
   *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
   *@param identifierHashes is tha array of document identifier hashes if the documents.
@@ -254,7 +290,7 @@ public interface IIncrementalIngester
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption;
 
-  /** Delete multiple documents from the search engine index.
+  /** Delete multiple documents, and their components, from the search engine index.
   *@param pipelineSpecificationBasic is the basic pipeline specification.
   *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
   *@param identifierHashes is tha array of document identifier hashes if the documents.
@@ -266,7 +302,7 @@ public interface IIncrementalIngester
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption;
 
-  /** Delete a document from the search engine index.
+  /** Delete a document, and all its components, from the search engine index.
   *@param pipelineSpecificationBasic is the basic pipeline specification.
   *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
   *@param identifierHash is the hash of the id of the document.

Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IngestStatuses.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IngestStatuses.java?rev=1612102&r1=1612101&r2=1612102&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IngestStatuses.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IngestStatuses.java Sun Jul 20 15:20:42 2014
@@ -28,7 +28,7 @@ public class IngestStatuses
 {
   public static final String _rcsid = "@(#)$Id$";
 
-  protected final Map<OutputKey,DocumentIngestStatus> statuses = new HashMap<OutputKey,DocumentIngestStatus>();
+  protected final Map<OutputKey,DocumentIngestStatusSet> statuses = new HashMap<OutputKey,DocumentIngestStatusSet>();
   
   public IngestStatuses()
   {
@@ -38,20 +38,29 @@ public class IngestStatuses
   *@param documentClass is the document class.
   *@param documentIDHash is the document id's hash value.
   *@param outputConnectionName is the output connection name.
-  *@param status is the status record.
+  *@param componentIDHash is the component ID hash value.
+  *@param status is the status.
   */
-  public void addStatus(String documentClass, String documentIDHash, String outputConnectionName, DocumentIngestStatus status)
+  public void addStatus(String documentClass, String documentIDHash, String outputConnectionName,
+    String componentIDHash, DocumentIngestStatus status)
   {
-    statuses.put(new OutputKey(documentClass,documentIDHash,outputConnectionName),status);
+    OutputKey ok = new OutputKey(documentClass,documentIDHash,outputConnectionName);
+    DocumentIngestStatusSet set = statuses.get(ok);
+    if (set == null)
+    {
+      set = new DocumentIngestStatusSet();
+      statuses.put(ok,set);
+    }
+    set.addDocumentStatus(componentIDHash,status);
   }
   
   /** Retrieve a status record.
   *@param documentClass is the document class.
   *@param documentIDHash is the document id's hash value.
   *@param outputConnectionName is the output connection name.
-  *@return the status record, if record.
+  *@return the status record, if exists.
   */
-  public DocumentIngestStatus getStatus(String documentClass, String documentIDHash, String outputConnectionName)
+  public DocumentIngestStatusSet getStatus(String documentClass, String documentIDHash, String outputConnectionName)
   {
     return statuses.get(new OutputKey(documentClass,documentIDHash,outputConnectionName));
   }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IExistingVersions.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IExistingVersions.java?rev=1612102&r1=1612101&r2=1612102&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IExistingVersions.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IExistingVersions.java Sun Jul 20 15:20:42 2014
@@ -28,10 +28,19 @@ public interface IExistingVersions
 {
   public static final String _rcsid = "@(#)$Id$";
 
-  /** Retrieve an existing version string given a document identifier.
+  /** Retrieve the primary existing version string given a document identifier.
   *@param documentIdentifier is the document identifier.
   *@return the document version string, or null if the document was never previously indexed.
   */
-  public String getIndexedVersionString(String documentIdentifier);
+  public String getIndexedVersionString(String documentIdentifier)
+    throws ManifoldCFException;
 
+  /** Retrieve a component existing version string given a document identifier.
+  *@param documentIdentifier is the document identifier.
+  *@param componentIdentifier is the component identifier, if any.
+  *@return the document version string, or null of the document component was never previously indexed.
+  */
+  public String getIndexedVersionString(String documentIdentifier, String componentIdentifier)
+    throws ManifoldCFException;
+  
 }

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java?rev=1612102&r1=1612101&r2=1612102&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IProcessActivity.java Sun Jul 20 15:20:42 2014
@@ -56,6 +56,19 @@ public interface IProcessActivity extend
     String newVersionString)
     throws ManifoldCFException;
 
+  /** Check if a document needs to be reindexed, based on a computed version string.
+  * Call this method to determine whether reindexing is necessary.  Pass in a newly-computed version
+  * string.  This method will return "true" if the document needs to be re-indexed.
+  *@param documentIdentifier is the document identifier.
+  *@param componentIdentifier is the component document identifier, if any.
+  *@param newVersionString is the newly-computed version string.
+  *@return true if the document needs to be reindexed.
+  */
+  public boolean checkDocumentNeedsReindexing(String documentIdentifier,
+    String componentIdentifier,
+    String newVersionString)
+    throws ManifoldCFException;
+
   /** Add a document description to the current job's queue.
   *@param documentIdentifier is the local document identifier to add (for the connector that
   * fetched the document).
@@ -139,7 +152,23 @@ public interface IProcessActivity extend
   *@param data is the document data.  The data is closed after ingestion is complete.
   *@throws IOException only when data stream reading fails.
   */
-  public void ingestDocumentWithException(String documentIdentifier, String version, String documentURI, RepositoryDocument data)
+  public void ingestDocumentWithException(String documentIdentifier,
+    String version, String documentURI, RepositoryDocument data)
+    throws ManifoldCFException, ServiceInterruption, IOException;
+
+  /** Ingest the current document.
+  *@param documentIdentifier is the document's identifier.
+  *@param componentIdentifier is the component document identifier, if any.
+  *@param version is the version of the document, as reported by the getDocumentVersions() method of the
+  *       corresponding repository connector.
+  *@param documentURI is the URI to use to retrieve this document from the search interface (and is
+  *       also the unique key in the index).
+  *@param data is the document data.  The data is closed after ingestion is complete.
+  *@throws IOException only when data stream reading fails.
+  */
+  public void ingestDocumentWithException(String documentIdentifier,
+    String componentIdentifier,
+    String version, String documentURI, RepositoryDocument data)
     throws ManifoldCFException, ServiceInterruption, IOException;
 
   /** Ingest the current document.
@@ -161,17 +190,38 @@ public interface IProcessActivity extend
   *@param documentIdentifier is the document's local identifier.
   *@param version is the version string to be recorded for the document.
   */
-  public void noDocument(String documentIdentifier, String version)
+  public void noDocument(String documentIdentifier,
+    String version)
     throws ManifoldCFException, ServiceInterruption;
 
-  /** Delete the specified document permanently from the search engine index, and from the status table.
-  * This method does NOT keep track of any document version information for the document and thus can
-  * lead to "churn", whereby the same document is queued, processed,
-  * and removed on subsequent crawls.  It is therefore preferable to use noDocument() instead,
-  * in any case where the same decision will need to be made over and over.
+  /** Remove the specified document from the search engine index, and update the
+  * recorded version information for the document.
+  *@param documentIdentifier is the document's local identifier.
+  *@param componentIdentifier is the component document identifier, if any.
+  *@param version is the version string to be recorded for the document.
+  */
+  public void noDocument(String documentIdentifier,
+    String componentIdentifier,
+    String version)
+    throws ManifoldCFException, ServiceInterruption;
+
+  /** Remove the specified document primary component permanently from the search engine index,
+  * and from the status table.  Use this method when your document has components and
+  * now also has a primary document, but will not have a primary document again for the foreseeable
+  * future.  This is a rare situation.
   *@param documentIdentifier is the document's identifier.
   */
-  public void deleteDocument(String documentIdentifier)
+  public void removeDocument(String documentIdentifier)
+    throws ManifoldCFException, ServiceInterruption;
+
+  /** Retain existing document component.  Use this method to signal that an already-existing
+  * document component does not need to be reindexed.  The default behavior is to remove
+  * components that are not mentioned during processing.
+  *@param documentIdentifier is the document's identifier.
+  *@param componentIdentifier is the component document identifier, which cannot be null.
+  */
+  public void retainDocument(String documentIdentifier,
+    String componentIdentifier)
     throws ManifoldCFException;
 
   /** Record a document version, WITHOUT reindexing it, or removing it.  (Other
@@ -181,7 +231,32 @@ public interface IProcessActivity extend
   *@param documentIdentifier is the document identifier.
   *@param version is the document version.
   */
-  public void recordDocument(String documentIdentifier, String version)
+  public void recordDocument(String documentIdentifier,
+    String version)
+    throws ManifoldCFException;
+
+  /** Record a document version, WITHOUT reindexing it, or removing it.  (Other
+  * documents with the same URL, however, will still be removed.)  This is
+  * useful if the version string changes but the document contents are known not
+  * to have changed.
+  *@param documentIdentifier is the document identifier.
+  *@param componentIdentifier is the component document identifier, if any.
+  *@param version is the document version.
+  */
+  public void recordDocument(String documentIdentifier,
+    String componentIdentifier,
+    String version)
+    throws ManifoldCFException;
+
+  /** Delete the specified document permanently from the search engine index, and from the status table,
+  * along with all its components.
+  * This method does NOT keep track of any document version information for the document and thus can
+  * lead to "churn", whereby the same document is queued, processed,
+  * and removed on subsequent crawls.  It is therefore preferable to use noDocument() instead,
+  * in any case where the same decision will need to be made over and over.
+  *@param documentIdentifier is the document's identifier.
+  */
+  public void deleteDocument(String documentIdentifier)
     throws ManifoldCFException;
 
   /** Delete the current document from the search engine index, while keeping track of the version information
@@ -194,7 +269,6 @@ public interface IProcessActivity extend
   public void deleteDocument(String documentIdentifier, String version)
     throws ManifoldCFException, ServiceInterruption;
 
-
   /** Override the schedule for the next time a document is crawled.
   * Calling this method allows you to set an upper recrawl bound, lower recrawl bound, upper expire bound, lower expire bound,
   * or a combination of these, on a specific document.  This method is only effective if the job is a continuous one, and if the

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java?rev=1612102&r1=1612101&r2=1612102&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java Sun Jul 20 15:20:42 2014
@@ -28,13 +28,14 @@ public class PipelineSpecificationWithVe
 {
   protected final IPipelineSpecification pipelineSpecification;
   protected final QueuedDocument queuedDocument;
+  protected final String componentIDHash;
     
   public PipelineSpecificationWithVersions(IPipelineSpecification pipelineSpecification,
-    QueuedDocument queuedDocument)
-    throws ManifoldCFException, ServiceInterruption
+    QueuedDocument queuedDocument, String componentIDHash)
   {
     this.pipelineSpecification = pipelineSpecification;
     this.queuedDocument = queuedDocument;
+    this.componentIDHash = componentIDHash;
   }
   
   /** Get pipeline specification.
@@ -49,7 +50,10 @@ public class PipelineSpecificationWithVe
   protected DocumentIngestStatus getStatus(int index)
   {
     IPipelineSpecificationBasic basic = pipelineSpecification.getBasicPipelineSpecification();
-    return queuedDocument.getLastIngestedStatus(basic.getStageConnectionName(basic.getOutputStage(index)));
+    DocumentIngestStatusSet set = queuedDocument.getLastIngestedStatus(basic.getStageConnectionName(basic.getOutputStage(index)));
+    if (set == null)
+      return null;
+    return set.getComponent(componentIDHash);
   }
   
   /** For a given output index, return a document version string.

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/QueuedDocument.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/QueuedDocument.java?rev=1612102&r1=1612101&r2=1612102&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/QueuedDocument.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/QueuedDocument.java Sun Jul 20 15:20:42 2014
@@ -39,7 +39,7 @@ public class QueuedDocument
   /** The document description. */
   protected final DocumentDescription documentDescription;
   /** The last ingested status, null meaning "never ingested". */
-  protected final Map<String,DocumentIngestStatus> lastIngestedStatus;
+  protected final Map<String,DocumentIngestStatusSet> lastIngestedStatus;
   /** The binnames for the document, according to the connector */
   protected final String[] binNames;
   /** This flag indicates whether the document has been processed or not. */
@@ -50,7 +50,7 @@ public class QueuedDocument
   *@param lastIngestedStatus is the document's last ingested status.
   *@param binNames are the bins associated with the document.
   */
-  public QueuedDocument(DocumentDescription documentDescription, Map<String,DocumentIngestStatus> lastIngestedStatus, String[] binNames)
+  public QueuedDocument(DocumentDescription documentDescription, Map<String,DocumentIngestStatusSet> lastIngestedStatus, String[] binNames)
   {
     this.documentDescription = documentDescription;
     this.lastIngestedStatus = lastIngestedStatus;
@@ -69,7 +69,7 @@ public class QueuedDocument
   *@param outputConnectionName is the name of the output connection.
   *@return the last ingested status for that output, or null if not found.
   */
-  public DocumentIngestStatus getLastIngestedStatus(String outputConnectionName)
+  public DocumentIngestStatusSet getLastIngestedStatus(String outputConnectionName)
   {
     if (lastIngestedStatus == null)
       return null;

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java?rev=1612102&r1=1612101&r2=1612102&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java Sun Jul 20 15:20:42 2014
@@ -257,9 +257,9 @@ public class StufferThread extends Threa
             for (int j = 0; j < pipelineSpecifications[i].getOutputCount(); j++)
             {
               String outputName = pipelineSpecifications[i].getStageConnectionName(pipelineSpecifications[i].getOutputStage(j));
-              DocumentIngestStatus status = statuses.getStatus(documentClasses[i],documentIDHashes[i],outputName);
-              if (status != null)
-                versions[i].put(outputName,status);
+              DocumentIngestStatusSet statusSet = statuses.getStatus(documentClasses[i],documentIDHashes[i],outputName);
+              if (statusSet != null)
+                versions[i].put(outputName,statusSet);
             }
           }
 
@@ -334,7 +334,7 @@ public class StufferThread extends Threa
               binNames = new String[]{""};
             }
 
-            QueuedDocument qd = new QueuedDocument(descs[i],(Map<String,DocumentIngestStatus>)versions[i],binNames);
+            QueuedDocument qd = new QueuedDocument(descs[i],(Map<String,DocumentIngestStatusSet>)versions[i],binNames);
 
             // Grab the arraylist that's there, or create it.
             List<QueuedDocument> set = documentSets.get(jobID);

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1612102&r1=1612101&r2=1612102&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Sun Jul 20 15:20:42 2014
@@ -346,22 +346,20 @@ public class WorkerThread extends Thread
                     boolean isDefaultAuthority = (aclAuthority.length() == 0);
 
                     // Build the processActivity object
+                    Map<String,QueuedDocument> previousDocuments = new HashMap<String,QueuedDocument>();
                     
-                    
-                    Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications = new HashMap<String,IPipelineSpecificationWithVersions>();
                     String[] documentIDs = new String[activeDocuments.size()];
                     int k = 0;
                     for (QueuedDocument qd : activeDocuments)
                     {
-                      fetchPipelineSpecifications.put(qd.getDocumentDescription().getDocumentIdentifierHash(),
-                        new PipelineSpecificationWithVersions(pipelineSpecification,qd));
+                      previousDocuments.put(qd.getDocumentDescription().getDocumentIdentifierHash(),qd);
                       documentIDs[k++] = qd.getDocumentDescription().getDocumentIdentifier();
                     }
                     
                     ProcessActivity activity = new ProcessActivity(job.getID(),processID,
                       threadContext,rt,jobManager,ingester,
                       connectionName,pipelineSpecification,
-                      fetchPipelineSpecifications,
+                      previousDocuments,
                       currentTime,
                       job.getExpiration(),
                       job.getForcedMetadata(),
@@ -380,6 +378,41 @@ public class WorkerThread extends Thread
                       try
                       {
                         connector.processDocuments(documentIDs,existingVersions,job.getSpecification(),activity,jobType,isDefaultAuthority);
+                        
+                        // Now do everything that the connector might have done if we were not doing it for it.
+
+                        // Right now, that's just getting rid of untouched components.
+                        for (QueuedDocument qd : activeDocuments)
+                        {
+                          String documentIdentifier = qd.getDocumentDescription().getDocumentIdentifier();
+                          if (!activity.wasDocumentAborted(documentIdentifier) && !activity.wasDocumentDeleted(documentIdentifier))
+                          {
+                            String documentIdentifierHash = qd.getDocumentDescription().getDocumentIdentifierHash();
+                            // In order to be able to loop over all the components that the incremental ingester knows about, we need to know
+                            // what the FIRST output is.
+                            DocumentIngestStatusSet set = qd.getLastIngestedStatus(ingester.getFirstIndexedOutputConnectionName(pipelineSpecificationBasic));
+                            if (set != null)
+                            {
+                              Iterator<String> componentHashes = set.componentIterator();
+                              while (componentHashes.hasNext())
+                              {
+                                String componentHash = componentHashes.next();
+                                // Check whether we've indexed or not
+                                if (!activity.wasDocumentComponentTouched(documentIdentifier,
+                                  componentHash))
+                                {
+                                  // This component must be removed.
+                                  ingester.documentRemove(
+                                    pipelineSpecificationBasic,
+                                    connectionName,documentIdentifierHash,componentHash,
+                                    ingestLogger);
+                                }
+                              }
+                            }
+                          }
+                        }
+
+                        // Done with connector functionality!
                       }
                       catch (ServiceInterruption e)
                       {
@@ -457,6 +490,7 @@ public class WorkerThread extends Thread
                           ingesterCheckList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
                         }
                       }
+                      
 
                       if (serviceInterruption != null)
                       {
@@ -1086,7 +1120,7 @@ public class WorkerThread extends Thread
     protected final IIncrementalIngester ingester;
     protected final String connectionName;
     protected final IPipelineSpecification pipelineSpecification;
-    protected final Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications;
+    protected final Map<String,QueuedDocument> previousDocuments;
     protected final long currentTime;
     protected final Long expireInterval;
     protected final Map<String,Set<String>> forcedMetadata;
@@ -1122,6 +1156,10 @@ public class WorkerThread extends Thread
     // Whether document was deleted
     protected final Set<String> documentDeletedSet = new HashSet<String>();
     
+    // Whether a component was touched or not, keyed by document identifier.
+    // This does not include primary document.  The set is keyed by component id hash.
+    protected final Map<String,Set<String>> touchedComponentSet = new HashMap<String,Set<String>>();
+    
     /** Constructor.
     *@param jobManager is the job manager
     *@param ingester is the ingester
@@ -1132,7 +1170,7 @@ public class WorkerThread extends Thread
       IIncrementalIngester ingester,
       String connectionName,
       IPipelineSpecification pipelineSpecification,
-      Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications,
+      Map<String,QueuedDocument> previousDocuments,
       long currentTime,
       Long expireInterval,
       Map<String,Set<String>> forcedMetadata,
@@ -1151,7 +1189,7 @@ public class WorkerThread extends Thread
       this.ingester = ingester;
       this.connectionName = connectionName;
       this.pipelineSpecification = pipelineSpecification;
-      this.fetchPipelineSpecifications = fetchPipelineSpecifications;
+      this.previousDocuments = previousDocuments;
       this.currentTime = currentTime;
       this.expireInterval = expireInterval;
       this.forcedMetadata = forcedMetadata;
@@ -1183,6 +1221,17 @@ public class WorkerThread extends Thread
     {
       return touchedSet.contains(documentIdentifier);
     }
+
+    /** Check whether a document component was touched or not.
+    */
+    public boolean wasDocumentComponentTouched(String documentIdentifier,
+      String componentIdentifierHash)
+    {
+      Set<String> components = touchedComponentSet.get(documentIdentifier);
+      if (components == null)
+        return false;
+      return components.contains(componentIdentifierHash);
+    }
     
     /** Check whether document was deleted or not.
     */
@@ -1198,6 +1247,41 @@ public class WorkerThread extends Thread
       return abortSet.contains(documentIdentifier);
     }
     
+    /** Check if a document needs to be reindexed, based on a computed version string.
+    * Call this method to determine whether reindexing is necessary.  Pass in a newly-computed version
+    * string.  This method will return "true" if the document needs to be re-indexed.
+    *@param documentIdentifier is the document identifier.
+    *@param newVersionString is the newly-computed version string.
+    *@return true if the document needs to be reindexed.
+    */
+    @Override
+    public boolean checkDocumentNeedsReindexing(String documentIdentifier,
+      String newVersionString)
+      throws ManifoldCFException
+    {
+      return checkDocumentNeedsReindexing(documentIdentifier,null,newVersionString);
+    }
+
+    /** Check if a document needs to be reindexed, based on a computed version string.
+    * Call this method to determine whether reindexing is necessary.  Pass in a newly-computed version
+    * string.  This method will return "true" if the document needs to be re-indexed.
+    *@param documentIdentifier is the document identifier.
+    *@param componentIdentifier is the component document identifier, if any.
+    *@param newVersionString is the newly-computed version string.
+    *@return true if the document needs to be reindexed.
+    */
+    @Override
+    public boolean checkDocumentNeedsReindexing(String documentIdentifier,
+      String componentIdentifier,
+      String newVersionString)
+      throws ManifoldCFException
+    {
+      String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
+      String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
+      IPipelineSpecificationWithVersions spec = computePipelineSpecification(documentIdentifierHash,componentIdentifierHash);
+      return ingester.checkFetchDocument(spec,newVersionString,parameterVersion,connection.getACLAuthority());
+    }
+
     /** Add a document description to the current job's queue.
     *@param localIdentifier is the local document identifier to add (for the connector that
     * fetched the document).
@@ -1299,23 +1383,6 @@ public class WorkerThread extends Thread
       existingDr.addPrerequisiteEvents(prereqEventNames);
     }
 
-    /** Check if a document needs to be reindexed, based on a computed version string.
-    * Call this method to determine whether reindexing is necessary.  Pass in a newly-computed version
-    * string.  This method will return "true" if the document needs to be re-indexed.
-    *@param documentIdentifier is the document identifier.
-    *@param newVersionString is the newly-computed version string.
-    *@return true if the document needs to be reindexed.
-    */
-    @Override
-    public boolean checkDocumentNeedsReindexing(String documentIdentifier,
-      String newVersionString)
-      throws ManifoldCFException
-    {
-      String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
-      IPipelineSpecificationWithVersions spec = fetchPipelineSpecifications.get(documentIdentifierHash);
-      return ingester.checkFetchDocument(spec,newVersionString,parameterVersion,connection.getACLAuthority());
-    }
-
     /** Add a document description to the current job's queue.
     *@param localIdentifier is the local document identifier to add (for the connector that
     * fetched the document).
@@ -1415,12 +1482,31 @@ public class WorkerThread extends Thread
     public void recordDocument(String documentIdentifier, String version)
       throws ManifoldCFException
     {
+      recordDocument(documentIdentifier,null,version);
+    }
+
+    /** Record a document version, WITHOUT reindexing it, or removing it.  (Other
+    * documents with the same URL, however, will still be removed.)  This is
+    * useful if the version string changes but the document contents are known not
+    * to have changed.
+    *@param documentIdentifier is the document identifier.
+    *@param componentIdentifier is the component document identifier, if any.
+    *@param version is the document version.
+    */
+    @Override
+    public void recordDocument(String documentIdentifier,
+      String componentIdentifier,
+      String version)
+      throws ManifoldCFException
+    {
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
+      String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
       ingester.documentRecord(
         pipelineSpecification.getBasicPipelineSpecification(),
-        connectionName,documentIdentifierHash,
+        connectionName,documentIdentifierHash,componentIdentifierHash,
         version,currentTime);
       touchedSet.add(documentIdentifier);
+      touchComponentSet(documentIdentifier,componentIdentifierHash);
     }
 
     /** Ingest the current document.
@@ -1462,11 +1548,31 @@ public class WorkerThread extends Thread
     public void ingestDocumentWithException(String documentIdentifier, String version, String documentURI, RepositoryDocument data)
       throws ManifoldCFException, ServiceInterruption, IOException
     {
+      ingestDocumentWithException(documentIdentifier,null,version,documentURI,data);
+    }
+
+    /** Ingest the current document.
+    *@param documentIdentifier is the document's identifier.
+    *@param componentIdentifier is the component document identifier, if any.
+    *@param version is the version of the document, as reported by the getDocumentVersions() method of the
+    *       corresponding repository connector.
+    *@param documentURI is the URI to use to retrieve this document from the search interface (and is
+    *       also the unique key in the index).
+    *@param data is the document data.  The data is closed after ingestion is complete.
+    *@throws IOException only when data stream reading fails.
+    */
+    @Override
+    public void ingestDocumentWithException(String documentIdentifier,
+      String componentIdentifier,
+      String version, String documentURI, RepositoryDocument data)
+      throws ManifoldCFException, ServiceInterruption, IOException
+    {
       // We should not get called here if versions agree, unless the repository
       // connector cannot distinguish between versions - in which case it must
       // always ingest (essentially)
 
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
+      String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
 
       if (data != null)
       {
@@ -1488,8 +1594,8 @@ public class WorkerThread extends Thread
         
       // First, we need to add into the metadata the stuff from the job description.
       ingester.documentIngest(
-        fetchPipelineSpecifications.get(documentIdentifierHash),
-        connectionName,documentIdentifierHash,
+        computePipelineSpecification(documentIdentifierHash,componentIdentifierHash),
+        connectionName,documentIdentifierHash,componentIdentifierHash,
         version,parameterVersion,
         connection.getACLAuthority(),
         data,currentTime,
@@ -1497,6 +1603,7 @@ public class WorkerThread extends Thread
         ingestLogger);
       
       touchedSet.add(documentIdentifier);
+      touchComponentSet(documentIdentifier,componentIdentifierHash);
     }
 
     /** Remove the specified document from the search engine index, while keeping track of the version information
@@ -1504,23 +1611,77 @@ public class WorkerThread extends Thread
     *@param documentIdentifier is the document's local identifier.
     *@param version is the version string to be recorded for the document.
     */
+    @Override
     public void noDocument(String documentIdentifier, String version)
       throws ManifoldCFException, ServiceInterruption
     {
+      noDocument(documentIdentifier,null,version);
+    }
+
+    /** Remove the specified document from the search engine index, and update the
+    * recorded version information for the document.
+    *@param documentIdentifier is the document's local identifier.
+    *@param componentIdentifier is the component document identifier, if any.
+    *@param version is the version string to be recorded for the document.
+    */
+    @Override
+    public void noDocument(String documentIdentifier,
+      String componentIdentifier,
+      String version)
+      throws ManifoldCFException, ServiceInterruption
+    {
       // Special interpretation for empty version string; treat as if the document doesn't exist
       // (by ignoring it and allowing it to be deleted later)
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
+      String componentIdentifierHash = computeComponentIDHash(componentIdentifier);
+
       ingester.documentNoData(
-        fetchPipelineSpecifications.get(documentIdentifierHash),
-        connectionName,documentIdentifierHash,
+        computePipelineSpecification(documentIdentifierHash,componentIdentifierHash),
+        connectionName,documentIdentifierHash,componentIdentifierHash,
         version,parameterVersion,
         connection.getACLAuthority(),
         currentTime,
         ingestLogger);
       
       touchedSet.add(documentIdentifier);
+      touchComponentSet(documentIdentifier,componentIdentifierHash);
+    }
+
+    /** Remove the specified document primary component permanently from the search engine index,
+    * and from the status table.  Use this method when your document has components and
+    * now also has a primary document, but will not have a primary document again for the foreseeable
+    * future.  This is a rare situation.
+    *@param documentIdentifier is the document's identifier.
+    */
+    @Override
+    public void removeDocument(String documentIdentifier)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
+      ingester.documentRemove(
+        pipelineSpecification.getBasicPipelineSpecification(),
+        connectionName,documentIdentifierHash,null,
+        ingestLogger);
+        
+      // Note that we touched it, so it won't get checked
+      touchedSet.add(documentIdentifier);
+    }
+
+    /** Retain existing document component.  Use this method to signal that an already-existing
+    * document component does not need to be reindexed.  The default behavior is to remove
+    * components that are not mentioned during processing.
+    *@param documentIdentifier is the document's identifier.
+    *@param componentIdentifier is the component document identifier, which cannot be null.
+    */
+    @Override
+    public void retainDocument(String documentIdentifier,
+      String componentIdentifier)
+      throws ManifoldCFException
+    {
+      touchComponentSet(documentIdentifier,computeComponentIDHash(componentIdentifier));
     }
 
+    
     /** Delete the current document from the search engine index, while keeping track of the version information
     * for it (to reduce churn).
     * Use noDocument() above instead.
@@ -1935,8 +2096,36 @@ public class WorkerThread extends Thread
       return ManifoldCF.createJobSpecificString(jobID,simpleString);
     }
 
+    protected void touchComponentSet(String documentIdentifier, String componentIdentifierHash)
+    {
+      if (componentIdentifierHash == null)
+        return;
+      Set<String> components = touchedComponentSet.get(documentIdentifier);
+      if (components == null)
+      {
+        components = new HashSet<String>();
+        touchedComponentSet.put(documentIdentifier,components);
+      }
+      components.add(componentIdentifierHash);
+    }
+    
+    protected IPipelineSpecificationWithVersions computePipelineSpecification(String documentIdentifierHash,
+      String componentIdentifierHash)
+    {
+      return new PipelineSpecificationWithVersions(pipelineSpecification,previousDocuments.get(documentIdentifierHash),componentIdentifierHash);
+    }
+
   }
 
+  protected static String computeComponentIDHash(String componentIdentifier)
+    throws ManifoldCFException
+  {
+    if (componentIdentifier != null)
+      return ManifoldCF.hash(componentIdentifier);
+    else
+      return null;
+  }
+    
   /** DocumentBin class */
   protected static class DocumentBin
   {
@@ -1991,6 +2180,7 @@ public class WorkerThread extends Thread
       }
       return true;
     }
+    
   }
 
   /** Class describing document reference.
@@ -2279,13 +2469,35 @@ public class WorkerThread extends Thread
     *@param documentIdentifier is the document identifier.
     *@return the document version string, or null if the document was never previously indexed.
     */
+    @Override
     public String getIndexedVersionString(String documentIdentifier)
+      throws ManifoldCFException
+    {
+      return getIndexedVersionString(documentIdentifier,null);
+    }
+
+    /** Retrieve a component existing version string given a document identifier.
+    *@param documentIdentifier is the document identifier.
+    *@param componentIdentifier is the component identifier, if any.
+    *@return the document version string, or null of the document component was never previously indexed.
+    */
+    @Override
+    public String getIndexedVersionString(String documentIdentifier, String componentIdentifier)
+      throws ManifoldCFException
     {
       QueuedDocument qd = map.get(documentIdentifier);
-      DocumentIngestStatus status = qd.getLastIngestedStatus(lastOutputConnectionName);
+      DocumentIngestStatusSet status = qd.getLastIngestedStatus(lastOutputConnectionName);
       if (status == null)
         return null;
-      return status.getDocumentVersion();
+      String componentIdentifierHash;
+      if (componentIdentifier == null)
+        componentIdentifierHash = null;
+      else
+        componentIdentifierHash = ManifoldCF.hash(componentIdentifier);
+      DocumentIngestStatus s = status.getComponent(componentIdentifierHash);
+      if (s == null)
+        return null;
+      return s.getDocumentVersion();
     }
 
   }