You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/07 13:56:07 UTC

svn commit: r1601092 - in /manifoldcf/branches/CONNECTORS-946: connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/ framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalinges...

Author: kwright
Date: Sat Jun  7 11:56:06 2014
New Revision: 1601092

URL: http://svn.apache.org/r1601092
Log:
Add pipeline code

Added:
    manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java   (with props)
Modified:
    manifoldcf/branches/CONNECTORS-946/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
    manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
    manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
    manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-946/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java?rev=1601092&r1=1601091&r2=1601092&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-946/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java Sat Jun  7 11:56:06 2014
@@ -71,7 +71,7 @@ public class NullConnector extends org.a
     try
     {
       long binaryLength = document.getBinaryLength();
-      int rval = activities.sendDocument(document);
+      int rval = activities.sendDocument(documentURI,document,authorityNameString);
       length =  new Long(binaryLength);
       resultCode = (rval == DOCUMENTSTATUS_ACCEPTED)?"ACCEPTED":"REJECTED";
       return rval;

Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1601092&r1=1601091&r2=1601092&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Sat Jun  7 11:56:06 2014
@@ -235,22 +235,22 @@ public class IncrementalIngester extends
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    // MHL to handle the pipeline
-    IOutputConnection connection = connectionManager.load(outputConnectionName);
-    IOutputConnector connector = outputConnectorPool.grab(connection);
-    if (connector == null)
-      // The connector is not installed; treat this as a service interruption.
-      throw new ServiceInterruption("Output connector not installed",0L);
+    PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+    if (pipeline == null)
+      // A connector is not installed; treat this as a service interruption.
+      throw new ServiceInterruption("One or more connectors are not installed",0L);
     try
     {
-      return connector.checkMimeTypeIndexable(outputDescription,mimeType,activity);
+      return pipeline.checkMimeTypeIndexable(mimeType,activity);
     }
     finally
     {
-      outputConnectorPool.release(connection,connector);
+      pipeline.release();
     }
   }
 
+  
+  
   /** Check if a file is indexable.
   *@param outputConnectionName is the name of the output connection associated with this action.
   *@param outputDescription is the output description string.
@@ -283,19 +283,17 @@ public class IncrementalIngester extends
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    // MHL
-    IOutputConnection connection = connectionManager.load(outputConnectionName);
-    IOutputConnector connector = outputConnectorPool.grab(connection);
-    if (connector == null)
-      // The connector is not installed; treat this as a service interruption.
-      throw new ServiceInterruption("Output connector not installed",0L);
+    PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+    if (pipeline == null)
+      // A connector is not installed; treat this as a service interruption.
+      throw new ServiceInterruption("One or more connectors are not installed",0L);
     try
     {
-      return connector.checkDocumentIndexable(outputDescription,localFile,activity);
+      return pipeline.checkDocumentIndexable(localFile,activity);
     }
     finally
     {
-      outputConnectorPool.release(connection,connector);
+      pipeline.release();
     }
   }
 
@@ -333,19 +331,17 @@ public class IncrementalIngester extends
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    // MHL
-    IOutputConnection connection = connectionManager.load(outputConnectionName);
-    IOutputConnector connector = outputConnectorPool.grab(connection);
-    if (connector == null)
-      // The connector is not installed; treat this as a service interruption.
-      throw new ServiceInterruption("Output connector not installed",0L);
+    PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+    if (pipeline == null)
+      // A connector is not installed; treat this as a service interruption.
+      throw new ServiceInterruption("One or more connectors are not installed",0L);
     try
     {
-      return connector.checkLengthIndexable(outputDescription,length,activity);
+      return pipeline.checkLengthIndexable(length,activity);
     }
     finally
     {
-      outputConnectorPool.release(connection,connector);
+      pipeline.release();
     }
   }
 
@@ -383,19 +379,74 @@ public class IncrementalIngester extends
     IOutputCheckActivity activity)
     throws ManifoldCFException, ServiceInterruption
   {
-    // MHL
-    IOutputConnection connection = connectionManager.load(outputConnectionName);
-    IOutputConnector connector = outputConnectorPool.grab(connection);
-    if (connector == null)
-      // The connector is not installed; treat this as a service interruption.
-      throw new ServiceInterruption("Output connector not installed",0L);
+    PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+    if (pipeline == null)
+      // A connector is not installed; treat this as a service interruption.
+      throw new ServiceInterruption("One or more connectors are not installed",0L);
     try
     {
-      return connector.checkURLIndexable(outputDescription,url,activity);
+      return pipeline.checkURLIndexable(url,activity);
     }
     finally
     {
-      outputConnectorPool.release(connection,connector);
+      pipeline.release();
+    }
+  }
+
+  
+  /** Grab the entire pipeline.
+  *@param transformationConnectionNames - the names of the transformation connections, in order
+  *@param outputConnectionName - the name of the output connection
+  *@param transformationDescriptionStrings - the array of description strings for transformations
+  *@param outputDescriptionString - the output description string
+  *@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
+  */
+  protected PipelineObject pipelineGrab(String[] transformationConnectionNames, String outputConnectionName,
+    String[] transformationDescriptionStrings, String outputDescriptionString)
+    throws ManifoldCFException
+  {
+    // Pick up all needed transformation connectors
+    ITransformationConnection[] transformationConnections = new ITransformationConnection[transformationConnectionNames.length];
+    
+    for (int i = 0; i < transformationConnections.length; i++)
+    {
+      transformationConnections[i] = transformationConnectionManager.load(transformationConnectionNames[i]);
+    }
+    
+    ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(transformationConnectionNames,transformationConnections);
+    for (ITransformationConnector c : transformationConnectors)
+    {
+      if (c == null)
+      {
+        transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+        return null;
+      }
+    }
+    
+    // Last, pick up output connector.  If it fails we have to release the transformation connectors.
+    try
+    {
+      IOutputConnection connection = connectionManager.load(outputConnectionName);
+      IOutputConnector connector = outputConnectorPool.grab(connection);
+      if (connector == null)
+      {
+        transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+        return null;
+      }
+      return new PipelineObject(transformationConnections,transformationConnectors,connection,connector,
+        transformationDescriptionStrings,outputDescriptionString);
+    }
+    catch (Throwable e)
+    {
+      transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+      if (e instanceof ManifoldCFException)
+        throw (ManifoldCFException)e;
+      else if (e instanceof RuntimeException)
+        throw (RuntimeException)e;
+      else if (e instanceof Error)
+        throw (Error)e;
+      else
+        throw new RuntimeException("Unexpected exception type: "+e.getClass().getName()+": "+e.getMessage(),e);
     }
   }
 
@@ -472,10 +523,6 @@ public class IncrementalIngester extends
     long recordTime, IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-    activities = new OutputActivitiesWrapper(activities,outputConnectionName);
-
-    IOutputConnection connection = connectionManager.load(outputConnectionName);
-
     String docKey = makeKey(identifierClass,identifierHash);
 
     if (Logging.ingest.isDebugEnabled())
@@ -483,7 +530,14 @@ public class IncrementalIngester extends
       Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
     }
 
-    performIngestion(connection,docKey,documentVersion,null,null,null,null,recordTime,null,activities);
+    performIngestion(new String[0],new String[0],
+      outputConnectionName,null,
+      docKey,documentVersion,null,
+      null,
+      null,
+      recordTime,
+      null,
+      activities);
   }
 
   /** Ingest a document.
@@ -604,26 +658,26 @@ public class IncrementalIngester extends
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
-    activities = new OutputActivitiesWrapper(activities,outputConnectionName);
-
-    // MHL
-    IOutputConnection connection = connectionManager.load(outputConnectionName);
-
     String docKey = makeKey(identifierClass,identifierHash);
 
     if (Logging.ingest.isDebugEnabled())
     {
       Logging.ingest.debug("Ingesting document '"+docKey+"' into output connection '"+outputConnectionName+"'");
     }
-
-    return performIngestion(connection,docKey,documentVersion,outputVersion,parameterVersion,authorityName,
-      data,ingestTime,documentURI,activities);
+    return performIngestion(transformationConnectionNames,transformationVersions,
+      outputConnectionName,outputVersion,
+      docKey,documentVersion,parameterVersion,
+      authorityName,
+      data,
+      ingestTime,documentURI,
+      activities);
   }
 
-  
   /** Do the actual ingestion, or just record it if there's nothing to ingest. */
-  protected boolean performIngestion(IOutputConnection connection,
-    String docKey, String documentVersion, String outputVersion, String parameterVersion,
+  protected boolean performIngestion(
+    String[] transformationConnectionNames, String[] transformationVersions,
+    String outputConnectionName, String outputVersion,
+    String docKey, String documentVersion, String parameterVersion,
     String authorityNameString,
     RepositoryDocument data,
     long ingestTime, String documentURI,
@@ -651,7 +705,7 @@ public class IncrementalIngester extends
         ArrayList list = new ArrayList();
         String query = buildConjunctionClause(list,new ClauseDescription[]{
           new UnitaryClause(docKeyField,docKey),
-          new UnitaryClause(outputConnNameField,connection.getName())});
+          new UnitaryClause(outputConnNameField,outputConnectionName)});
           
         IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+
           " WHERE "+query,list,null,null);
@@ -697,9 +751,9 @@ public class IncrementalIngester extends
     String[] lockArray = new String[uriCount];
     uriCount = 0;
     if (documentURI != null)
-      lockArray[uriCount++] = connection.getName()+":"+documentURI;
+      lockArray[uriCount++] = outputConnectionName+":"+documentURI;
     if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
-      lockArray[uriCount++] = connection.getName()+":"+oldURI;
+      lockArray[uriCount++] = outputConnectionName+":"+oldURI;
 
     lockManager.enterCriticalSections(null,null,lockArray);
     try
@@ -713,10 +767,11 @@ public class IncrementalIngester extends
         list.clear();
         String query = buildConjunctionClause(list,new ClauseDescription[]{
           new UnitaryClause(uriHashField,"=",oldURIHash),
-          new UnitaryClause(outputConnNameField,"=",connection.getName())});
+          new UnitaryClause(outputConnNameField,"=",outputConnectionName)});
         list.add(docKey);
         performDelete("WHERE "+query+" AND "+docKeyField+"!=?",list,null);
-        removeDocument(connection,oldURI,oldOutputVersion,activities);
+        IOutputConnection connection = connectionManager.load(outputConnectionName);
+        removeDocument(connection,oldURI,oldOutputVersion,new OutputRemoveActivitiesWrapper(activities,outputConnectionName));
       }
 
       if (documentURI != null)
@@ -725,7 +780,7 @@ public class IncrementalIngester extends
         list.clear();
         String query = buildConjunctionClause(list,new ClauseDescription[]{
           new UnitaryClause(uriHashField,"=",documentURIHash),
-          new UnitaryClause(outputConnNameField,"=",connection.getName())});
+          new UnitaryClause(outputConnNameField,"=",outputConnectionName)});
         list.add(docKey);
         performDelete("WHERE "+query+" AND "+ docKeyField+"!=?",list,null);
       }
@@ -750,15 +805,21 @@ public class IncrementalIngester extends
         // This is a marker that says "something is there"; it has an empty version, which indicates
         // that we don't know anything about it.  That means it will be reingested when the
         // next version comes along, and will be deleted if called for also.
-        noteDocumentIngest(connection.getName(),docKey,null,null,null,null,ingestTime,documentURI,documentURIHash);
-        int result = addOrReplaceDocument(connection,documentURI,outputVersion,data,authorityNameString,activities);
-        noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
+        // MHL -- needs to record info about transformations!!
+        noteDocumentIngest(outputConnectionName,docKey,null,null,null,null,ingestTime,documentURI,documentURIHash);
+        int result = addOrReplaceDocument(transformationConnectionNames,transformationVersions,
+          outputConnectionName,outputVersion,
+          documentURI,data,authorityNameString,
+          activities);
+        // MHL -- needs to record info about transformations!!
+        noteDocumentIngest(outputConnectionName,docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
         return result == IOutputConnector.DOCUMENTSTATUS_ACCEPTED;
       }
 
       // If we get here, it means we are noting that the document was examined, but that no change was required.  This is signaled
       // to noteDocumentIngest by having the null documentURI.
-      noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,null,null);
+      // MHL -- needs to record info about transformations!!
+      noteDocumentIngest(outputConnectionName,docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,null,null);
       return true;
     }
     finally
@@ -1875,24 +1936,29 @@ public class IncrementalIngester extends
 
   /** Add or replace document, using the specified output connection, via the standard pool.
   */
-  protected int addOrReplaceDocument(IOutputConnection connection, String documentURI, String outputDescription,
-    RepositoryDocument document, String authorityNameString,
-    IOutputAddActivity activities)
+  protected int addOrReplaceDocument(
+    String[] transformationConnectionNames, String[] transformationDescriptionStrings,
+    String outputConnectionName, String outputDescriptionString,
+    String documentURI, RepositoryDocument document, String authorityNameString,
+    IOutputAddActivity finalActivities)
     throws ManifoldCFException, ServiceInterruption
   {
     // Set indexing date
     document.setIndexingDate(new Date());
-    IOutputConnector connector = outputConnectorPool.grab(connection);
-    if (connector == null)
-      // The connector is not installed; treat this as a service interruption.
-      throw new ServiceInterruption("Output connector not installed",0L);
+    
+    // Set up a pipeline
+    PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,
+      transformationDescriptionStrings,outputDescriptionString);
+    if (pipeline == null)
+      // A connector is not installed; treat this as a service interruption.
+      throw new ServiceInterruption("Pipeline connector not installed",0L);
     try
     {
-      return connector.addOrReplaceDocument(documentURI,outputDescription,document,authorityNameString,activities);
+      return pipeline.addOrReplaceDocument(documentURI,document,authorityNameString,finalActivities);
     }
     finally
     {
-      outputConnectorPool.release(connection,connector);
+      pipeline.release();
     }
   }
 
@@ -1983,6 +2049,45 @@ public class IncrementalIngester extends
 
   }
   
+  /** Wrapper class for add activity.  This handles conversion of transformation connector activity logging to 
+  * qualified activity names */
+  protected static class TransformationRecordingActivity implements IOutputHistoryActivity
+  {
+    protected final IOutputHistoryActivity activityProvider;
+    protected final String transformationConnectionName;
+    
+    public TransformationRecordingActivity(IOutputHistoryActivity activityProvider, String transformationConnectionName)
+    {
+      this.activityProvider = activityProvider;
+      this.transformationConnectionName = transformationConnectionName;
+    }
+    
+    /** Record time-stamped information about the activity of the output connector.
+    *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970).  Every
+    *       activity has an associated time; the startTime field records when the activity began.  A null value
+    *       indicates that the start time and the finishing time are the same.
+    *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is
+    *       used to categorize what kind of activity is being recorded.  For example, a web connector might record a
+    *       "fetch document" activity.  Cannot be null.
+    *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable.
+    *@param entityURI is a (possibly long) string which identifies the object involved in the history record.
+    *       The interpretation of this field will differ from connector to connector.  May be null.
+    *@param resultCode contains a terse description of the result of the activity.  The description is limited in
+    *       size to 255 characters, and can be interpreted only in the context of the current connector.  May be null.
+    *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
+    *       described in the resultCode field.  This field is not meant to be queried on.  May be null.
+    */
+    @Override
+    public void recordActivity(Long startTime, String activityType, Long dataSize,
+      String entityURI, String resultCode, String resultDescription)
+      throws ManifoldCFException
+    {
+      activityProvider.recordActivity(startTime,ManifoldCF.qualifyTransformationActivityName(activityType,transformationConnectionName),
+        dataSize,entityURI,resultCode,resultDescription);
+    }
+
+  }
+
   protected static class OutputRemoveActivitiesWrapper extends OutputRecordingActivity implements IOutputRemoveActivity
   {
     protected final IOutputRemoveActivity activities;
@@ -1995,11 +2100,11 @@ public class IncrementalIngester extends
 
   }
   
-  protected static class OutputActivitiesWrapper extends OutputRecordingActivity implements IOutputActivity
+  protected static class OutputAddActivitiesWrapper extends OutputRecordingActivity implements IOutputAddActivity
   {
-    protected final IOutputActivity activities;
+    protected final IOutputAddActivity activities;
     
-    public OutputActivitiesWrapper(IOutputActivity activities, String outputConnectionName)
+    public OutputAddActivitiesWrapper(IOutputAddActivity activities, String outputConnectionName)
     {
       super(activities,outputConnectionName);
       this.activities = activities;
@@ -2019,13 +2124,15 @@ public class IncrementalIngester extends
     }
 
     /** Send a document via the pipeline to the next output connection.
+    *@param documentURI is the document's URI.
     *@param document is the document data to be processed (handed to the output data store).
+    *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens.
     *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
     */
-    public int sendDocument(RepositoryDocument document)
+    public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
       throws ManifoldCFException, ServiceInterruption
     {
-      return activities.sendDocument(document);
+      return activities.sendDocument(documentURI,document,authorityNameString);
     }
 
     /** Detect if a mime type is acceptable downstream or not.  This method is used to determine whether it makes sense to fetch a document
@@ -2079,4 +2186,370 @@ public class IncrementalIngester extends
 
   }
   
+  protected static class OutputActivitiesWrapper extends OutputAddActivitiesWrapper implements IOutputActivity
+  {
+    protected final IOutputActivity activities;
+    
+    public OutputActivitiesWrapper(IOutputActivity activities, String outputConnectionName)
+    {
+      super(activities,outputConnectionName);
+      this.activities = activities;
+    }
+  }
+  
+  protected class PipelineObject
+  {
+    public final IOutputConnection outputConnection;
+    public final IOutputConnector outputConnector;
+    public final ITransformationConnection[] transformationConnections;
+    public final ITransformationConnector[] transformationConnectors;
+    public final String outputDescription;
+    public final String[] transformationDescriptions;
+    
+    public PipelineObject(
+      ITransformationConnection[] transformationConnections, ITransformationConnector[] transformationConnectors,
+      IOutputConnection outputConnection, IOutputConnector outputConnector,
+      String[] transformationDescriptions, String outputDescription)
+    {
+      this.transformationConnections = transformationConnections;
+      this.transformationConnectors = transformationConnectors;
+      this.outputConnection = outputConnection;
+      this.outputConnector = outputConnector;
+      this.outputDescription = outputDescription;
+      this.transformationDescriptions = transformationDescriptions;
+    }
+    
+    public boolean checkMimeTypeIndexable(String mimeType, IOutputCheckActivity finalActivity)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
+      return entryPoint.getPipelineConnector().checkMimeTypeIndexable(entryPoint.getPipelineDescriptionString(),mimeType,entryPoint.getPipelineCheckActivity());
+    }
+
+    public boolean checkDocumentIndexable(File localFile, IOutputCheckActivity finalActivity)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
+      return entryPoint.getPipelineConnector().checkDocumentIndexable(entryPoint.getPipelineDescriptionString(),localFile,entryPoint.getPipelineCheckActivity());
+    }
+
+    public boolean checkLengthIndexable(long length, IOutputCheckActivity finalActivity)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
+      return entryPoint.getPipelineConnector().checkLengthIndexable(entryPoint.getPipelineDescriptionString(),length,entryPoint.getPipelineCheckActivity());
+    }
+    
+    public boolean checkURLIndexable(String uri, IOutputCheckActivity finalActivity)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
+      return entryPoint.getPipelineConnector().checkURLIndexable(entryPoint.getPipelineDescriptionString(),uri,entryPoint.getPipelineCheckActivity());
+    }
+
+    public int addOrReplaceDocument(String documentURI, RepositoryDocument document, String authorityNameString, IOutputAddActivity finalActivity)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      PipelineAddEntryPoint entryPoint = buildAddPipeline(finalActivity);
+      return entryPoint.getPipelineConnector().addOrReplaceDocument(documentURI,entryPoint.getPipelineDescriptionString(),
+        document,authorityNameString,entryPoint.getPipelineAddActivity());
+    }
+    
+    public void release()
+      throws ManifoldCFException
+    {
+      outputConnectorPool.release(outputConnection,outputConnector);
+      transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+    }
+    
+    protected PipelineCheckEntryPoint buildCheckPipeline(IOutputCheckActivity finalActivity)
+    {
+      // Build output stage first
+      PipelineCheckEntryPoint currentStage = new PipelineCheckEntryPoint(outputConnector,outputDescription,finalActivity);
+      // Go through transformations backwards
+      int i = transformationConnectors.length;
+      while (i > 0)
+      {
+        i--;
+        currentStage = new PipelineCheckEntryPoint(transformationConnectors[i],transformationDescriptions[i],
+          new PipelineCheckActivity(currentStage.getPipelineConnector(),currentStage.getPipelineDescriptionString(),currentStage.getPipelineCheckActivity()));
+      }
+      return currentStage;
+    }
+
+    protected PipelineAddEntryPoint buildAddPipeline(IOutputAddActivity finalActivity)
+    {
+      // Build output stage first
+      PipelineAddEntryPoint currentStage = new PipelineAddEntryPoint(outputConnector,outputDescription,
+        new OutputAddActivitiesWrapper(finalActivity,outputConnection.getName()));
+      // Go through transformations backwards
+      int i = transformationConnectors.length;
+      while (i > 0)
+      {
+        i--;
+        currentStage = new PipelineAddEntryPoint(transformationConnectors[i],transformationDescriptions[i],
+          new PipelineAddActivity(currentStage.getPipelineConnector(),currentStage.getPipelineDescriptionString(),currentStage.getPipelineAddActivity(),
+            new TransformationRecordingActivity(finalActivity,transformationConnections[i].getName()),finalActivity));
+      }
+      return currentStage;
+    }
+
+  }
+
+  /** This class describes the entry stage of a check pipeline.
+  */
+  public static class PipelineCheckEntryPoint
+  {
+    protected final IPipelineConnector pipelineConnector;
+    protected final String pipelineDescriptionString;
+    protected final IOutputCheckActivity checkActivity;
+    
+    public PipelineCheckEntryPoint(IPipelineConnector pipelineConnector,
+      String pipelineDescriptionString,
+      IOutputCheckActivity checkActivity)
+    {
+      this.pipelineConnector = pipelineConnector;
+      this.pipelineDescriptionString = pipelineDescriptionString;
+      this.checkActivity = checkActivity;
+    }
+    
+    public IPipelineConnector getPipelineConnector()
+    {
+      return pipelineConnector;
+    }
+    
+    public String getPipelineDescriptionString()
+    {
+      return pipelineDescriptionString;
+    }
+    
+    public IOutputCheckActivity getPipelineCheckActivity()
+    {
+      return checkActivity;
+    }
+  }
+  
+  /** This class is used to join together pipeline stages for check operations */
+  public static class PipelineCheckActivity implements IOutputCheckActivity
+  {
+    protected final IPipelineConnector pipelineConnector;
+    protected final String pipelineDescriptionString;
+    protected final IOutputCheckActivity checkActivity;
+
+    public PipelineCheckActivity(IPipelineConnector pipelineConnector, String pipelineDescriptionString, IOutputCheckActivity checkActivity)
+    {
+      this.pipelineConnector = pipelineConnector;
+      this.pipelineDescriptionString = pipelineDescriptionString;
+      this.checkActivity = checkActivity;
+    }
+
+    /** Detect if a mime type is acceptable downstream or not.  This method is used to determine whether it makes sense to fetch a document
+    * in the first place.
+    *@param mimeType is the mime type of the document.
+    *@return true if the mime type can be accepted by the downstream connection.
+    */
+    @Override
+    public boolean checkMimeTypeIndexable(String mimeType)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return pipelineConnector.checkMimeTypeIndexable(pipelineDescriptionString,mimeType,checkActivity);
+    }
+
+    /** Pre-determine whether a document (passed here as a File object) is acceptable downstream.  This method is
+    * used to determine whether a document needs to be actually transferred.  This hook is provided mainly to support
+    * search engines that only handle a small set of accepted file types.
+    *@param localFile is the local file to check.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkDocumentIndexable(File localFile)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return pipelineConnector.checkDocumentIndexable(pipelineDescriptionString,localFile,checkActivity);
+    }
+
+    /** Pre-determine whether a document's length is acceptable downstream.  This method is used
+    * to determine whether to fetch a document in the first place.
+    *@param length is the length of the document.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkLengthIndexable(long length)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return pipelineConnector.checkLengthIndexable(pipelineDescriptionString,length,checkActivity);
+    }
+
+    /** Pre-determine whether a document's URL is acceptable downstream.  This method is used
+    * to help filter out documents that cannot be indexed in advance.
+    *@param url is the URL of the document.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkURLIndexable(String url)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return pipelineConnector.checkURLIndexable(pipelineDescriptionString,url,checkActivity);
+    }
+
+  }
+
+  /** This class describes the entry stage of an add pipeline.
+  */
+  public static class PipelineAddEntryPoint
+  {
+    protected final IPipelineConnector pipelineConnector;
+    protected final String pipelineDescriptionString;
+    protected final IOutputAddActivity addActivity;
+    
+    public PipelineAddEntryPoint(IPipelineConnector pipelineConnector,
+      String pipelineDescriptionString,
+      IOutputAddActivity addActivity)
+    {
+      this.pipelineConnector = pipelineConnector;
+      this.pipelineDescriptionString = pipelineDescriptionString;
+      this.addActivity = addActivity;
+    }
+    
+    public IPipelineConnector getPipelineConnector()
+    {
+      return pipelineConnector;
+    }
+    
+    public String getPipelineDescriptionString()
+    {
+      return pipelineDescriptionString;
+    }
+    
+    public IOutputAddActivity getPipelineAddActivity()
+    {
+      return addActivity;
+    }
+  }
+
+  /** This class is used to join together pipeline stages for add operations */
+  public static class PipelineAddActivity implements IOutputAddActivity
+  {
+    protected final IPipelineConnector pipelineConnector;
+    protected final String pipelineDescriptionString;
+    protected final IOutputAddActivity addActivity;
+    protected final IOutputHistoryActivity finalHistoryActivity;
+    protected final IOutputQualifyActivity finalQualifyActivity;
+
+    public PipelineAddActivity(IPipelineConnector pipelineConnector,
+      String pipelineDescriptionString,
+      IOutputAddActivity addActivity,
+      IOutputHistoryActivity finalHistoryActivity,
+      IOutputQualifyActivity finalQualifyActivity)
+    {
+      this.pipelineConnector = pipelineConnector;
+      this.pipelineDescriptionString = pipelineDescriptionString;
+      this.addActivity = addActivity;
+      this.finalHistoryActivity = finalHistoryActivity;
+      this.finalQualifyActivity = finalQualifyActivity;
+    }
+
+    /** Detect if a mime type is acceptable downstream or not.  This method is used to determine whether it makes sense to fetch a document
+    * in the first place.
+    *@param mimeType is the mime type of the document.
+    *@return true if the mime type can be accepted by the downstream connection.
+    */
+    @Override
+    public boolean checkMimeTypeIndexable(String mimeType)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return pipelineConnector.checkMimeTypeIndexable(pipelineDescriptionString,mimeType,addActivity);
+    }
+
+    /** Pre-determine whether a document (passed here as a File object) is acceptable downstream.  This method is
+    * used to determine whether a document needs to be actually transferred.  This hook is provided mainly to support
+    * search engines that only handle a small set of accepted file types.
+    *@param localFile is the local file to check.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkDocumentIndexable(File localFile)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return pipelineConnector.checkDocumentIndexable(pipelineDescriptionString,localFile,addActivity);
+    }
+
+    /** Pre-determine whether a document's length is acceptable downstream.  This method is used
+    * to determine whether to fetch a document in the first place.
+    *@param length is the length of the document.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkLengthIndexable(long length)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return pipelineConnector.checkLengthIndexable(pipelineDescriptionString,length,addActivity);
+    }
+
+    /** Pre-determine whether a document's URL is acceptable downstream.  This method is used
+    * to help filter out documents that cannot be indexed in advance.
+    *@param url is the URL of the document.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkURLIndexable(String url)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return pipelineConnector.checkURLIndexable(pipelineDescriptionString,url,addActivity);
+    }
+    
+    /** Send a document via the pipeline to the next output connection.
+    *@param documentURI is the document's URI.
+    *@param document is the document data to be processed (handed to the output data store).
+    *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens.
+    *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
+    */
+    public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      // This goes to the next pipeline stage.
+      return pipelineConnector.addOrReplaceDocument(documentURI,pipelineDescriptionString,
+        document,authorityNameString,addActivity);
+    }
+
+    /** Qualify an access token appropriately, to match access tokens as returned by mod_aa.  This method
+    * includes the authority name with the access token, if any, so that each authority may establish its own token space.
+    *@param authorityNameString is the name of the authority to use to qualify the access token.
+    *@param accessToken is the raw, repository access token.
+    *@return the properly qualified access token.
+    */
+    @Override
+    public String qualifyAccessToken(String authorityNameString, String accessToken)
+      throws ManifoldCFException
+    {
+      // This functionality does not need to be staged; we just want to vector through to the final stage directly.
+      return finalQualifyActivity.qualifyAccessToken(authorityNameString,accessToken);
+    }
+
+    /** Record time-stamped information about the activity of the output connector.
+    *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970).  Every
+    *       activity has an associated time; the startTime field records when the activity began.  A null value
+    *       indicates that the start time and the finishing time are the same.
+    *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is
+    *       used to categorize what kind of activity is being recorded.  For example, a web connector might record a
+    *       "fetch document" activity.  Cannot be null.
+    *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable.
+    *@param entityURI is a (possibly long) string which identifies the object involved in the history record.
+    *       The interpretation of this field will differ from connector to connector.  May be null.
+    *@param resultCode contains a terse description of the result of the activity.  The description is limited in
+    *       size to 255 characters, and can be interpreted only in the context of the current connector.  May be null.
+    *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
+    *       described in the resultCode field.  This field is not meant to be queried on.  May be null.
+    */
+    public void recordActivity(Long startTime, String activityType, Long dataSize,
+      String entityURI, String resultCode, String resultDescription)
+      throws ManifoldCFException
+    {
+      // Each stage of the pipeline uses a specific activity for recording history, but it's not fundamentally
+      // pipelined
+      finalHistoryActivity.recordActivity(startTime,activityType,dataSize,entityURI,resultCode,resultDescription);
+    }
+
+  }
+  
 }

Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java?rev=1601092&r1=1601091&r2=1601092&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java Sat Jun  7 11:56:06 2014
@@ -24,24 +24,17 @@ import org.apache.manifoldcf.agents.inte
 /** This interface abstracts from the activities that an output connector can do
 when adding or replacing documents.
 */
-public interface IOutputAddActivity extends IOutputHistoryActivity,IOutputCheckActivity
+public interface IOutputAddActivity extends IOutputQualifyActivity,IOutputHistoryActivity,IOutputCheckActivity
 {
   public static final String _rcsid = "@(#)$Id: IOutputAddActivity.java 988245 2010-08-23 18:39:35Z kwright $";
 
-  /** Qualify an access token appropriately, to match access tokens as returned by mod_aa.  This method
-  * includes the authority name with the access token, if any, so that each authority may establish its own token space.
-  *@param authorityNameString is the name of the authority to use to qualify the access token.
-  *@param accessToken is the raw, repository access token.
-  *@return the properly qualified access token.
-  */
-  public String qualifyAccessToken(String authorityNameString, String accessToken)
-    throws ManifoldCFException;
-
   /** Send a document via the pipeline to the next output connection.
+  *@param documentURI is the document's URI.
   *@param document is the document data to be processed (handed to the output data store).
+  *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens.
   *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
   */
-  public int sendDocument(RepositoryDocument document)
+  public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
     throws ManifoldCFException, ServiceInterruption;
 
 }

Added: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java?rev=1601092&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java (added)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java Sat Jun  7 11:56:06 2014
@@ -0,0 +1,39 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.agents.interfaces;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+
+/** This interface abstracts from the activities that an output connector can do
+when qualifying authority names.
+*/
+public interface IOutputQualifyActivity
+{
+  public static final String _rcsid = "@(#)$Id$";
+
+  /** Qualify an access token appropriately, to match access tokens as returned by mod_aa.  This method
+  * includes the authority name with the access token, if any, so that each authority may establish its own token space.
+  *@param authorityNameString is the name of the authority to use to qualify the access token.
+  *@param accessToken is the raw, repository access token.
+  *@return the properly qualified access token.
+  */
+  public String qualifyAccessToken(String authorityNameString, String accessToken)
+    throws ManifoldCFException;
+}

Propchange: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java
------------------------------------------------------------------------------
    svn:keywords = Id

Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1601092&r1=1601091&r2=1601092&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Sat Jun  7 11:56:06 2014
@@ -2643,12 +2643,13 @@ public class WorkerThread extends Thread
           return URLEncoder.encode(authorityNameString) + ":" + URLEncoder.encode(accessToken);
     }
 
-    /** Send adocument via the pipeline to the next connection.
+    /** Send a document via the pipeline to the next output connection.
+    *@param documentURI is the document's URI.
     *@param document is the document data to be processed (handed to the output data store).
+    *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens.
     *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
     */
-    @Override
-    public int sendDocument(RepositoryDocument document)
+    public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
       throws ManifoldCFException, ServiceInterruption
     {
       // No downstream connection at output connection level.