You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/06 14:35:34 UTC

svn commit: r1600869 - in /manifoldcf/branches/CONNECTORS-946/framework: agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ agents/src/main/java/org/apache/manifoldcf/agent...

Author: kwright
Date: Fri Jun  6 12:35:33 2014
New Revision: 1600869

URL: http://svn.apache.org/r1600869
Log:
Convert WorkerThread to the new APIs

Modified:
    manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
    manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputConnectionManagerFactory.java
    manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/TransformationConnectionManagerFactory.java
    manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
    manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
    manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java

Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Fri Jun  6 12:35:33 2014
@@ -472,6 +472,8 @@ public class IncrementalIngester extends
     long recordTime, IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
+    activities = new OutputActivitiesWrapper(activities,outputConnectionName);
+
     IOutputConnection connection = connectionManager.load(outputConnectionName);
 
     String docKey = makeKey(identifierClass,identifierHash);
@@ -602,6 +604,8 @@ public class IncrementalIngester extends
     IOutputActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
+    activities = new OutputActivitiesWrapper(activities,outputConnectionName);
+
     // MHL
     IOutputConnection connection = connectionManager.load(outputConnectionName);
 
@@ -897,6 +901,7 @@ public class IncrementalIngester extends
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
+
     // Segregate request by connection names
     HashMap keyMap = new HashMap();
     int i = 0;
@@ -945,6 +950,8 @@ public class IncrementalIngester extends
     IOutputRemoveActivity activities)
     throws ManifoldCFException, ServiceInterruption
   {
+    activities = new OutputRemoveActivitiesWrapper(activities,outputConnectionName);
+
     IOutputConnection connection = connectionManager.load(outputConnectionName);
 
     if (Logging.ingest.isDebugEnabled())
@@ -1848,9 +1855,17 @@ public class IncrementalIngester extends
       {
         Long id = (Long)row.getValue(idField);
         String lastVersion = (String)row.getValue(lastVersionField);
+        if (lastVersion == null)
+          lastVersion = "";
         String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
+        if (lastOutputVersion == null)
+          lastOutputVersion = "";
         String authorityName = (String)row.getValue(authorityNameField);
+        if (authorityName == null)
+          authorityName = "";
         String paramVersion = (String)row.getValue(forcedParamsField);
+        if (paramVersion == null)
+          paramVersion = "";
         rval[position.intValue()] = new DocumentIngestStatus(lastVersion,new String[0],lastOutputVersion,authorityName,paramVersion,new String[0]);
       }
     }
@@ -1928,4 +1943,140 @@ public class IncrementalIngester extends
       return outputVersion;
     }
   }
+  
+  /** Wrapper class for add activity.  This handles conversion of output connector activity logging to 
+  * qualified activity names */
+  protected static class OutputRecordingActivity implements IOutputHistoryActivity
+  {
+    protected final IOutputHistoryActivity activityProvider;
+    protected final String outputConnectionName;
+    
+    public OutputRecordingActivity(IOutputHistoryActivity activityProvider, String outputConnectionName)
+    {
+      this.activityProvider = activityProvider;
+      this.outputConnectionName = outputConnectionName;
+    }
+    
+    /** Record time-stamped information about the activity of the output connector.
+    *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970).  Every
+    *       activity has an associated time; the startTime field records when the activity began.  A null value
+    *       indicates that the start time and the finishing time are the same.
+    *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is
+    *       used to categorize what kind of activity is being recorded.  For example, a web connector might record a
+    *       "fetch document" activity.  Cannot be null.
+    *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable.
+    *@param entityURI is a (possibly long) string which identifies the object involved in the history record.
+    *       The interpretation of this field will differ from connector to connector.  May be null.
+    *@param resultCode contains a terse description of the result of the activity.  The description is limited in
+    *       size to 255 characters, and can be interpreted only in the context of the current connector.  May be null.
+    *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
+    *       described in the resultCode field.  This field is not meant to be queried on.  May be null.
+    */
+    @Override
+    public void recordActivity(Long startTime, String activityType, Long dataSize,
+      String entityURI, String resultCode, String resultDescription)
+      throws ManifoldCFException
+    {
+      activityProvider.recordActivity(startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),
+        dataSize,entityURI,resultCode,resultDescription);
+    }
+
+  }
+  
+  protected static class OutputRemoveActivitiesWrapper extends OutputRecordingActivity implements IOutputRemoveActivity
+  {
+    protected final IOutputRemoveActivity activities;
+    
+    public OutputRemoveActivitiesWrapper(IOutputRemoveActivity activities, String outputConnectionName)
+    {
+      super(activities,outputConnectionName);
+      this.activities = activities;
+    }
+
+  }
+  
+  protected static class OutputActivitiesWrapper extends OutputRecordingActivity implements IOutputActivity
+  {
+    protected final IOutputActivity activities;
+    
+    public OutputActivitiesWrapper(IOutputActivity activities, String outputConnectionName)
+    {
+      super(activities,outputConnectionName);
+      this.activities = activities;
+    }
+    
+    /** Qualify an access token appropriately, to match access tokens as returned by mod_aa.  This method
+    * includes the authority name with the access token, if any, so that each authority may establish its own token space.
+    *@param authorityNameString is the name of the authority to use to qualify the access token.
+    *@param accessToken is the raw, repository access token.
+    *@return the properly qualified access token.
+    */
+    @Override
+    public String qualifyAccessToken(String authorityNameString, String accessToken)
+      throws ManifoldCFException
+    {
+      return activities.qualifyAccessToken(authorityNameString,accessToken);
+    }
+
+    /** Send a document via the pipeline to the next output connection.
+    *@param document is the document data to be processed (handed to the output data store).
+    *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
+    */
+    public int sendDocument(RepositoryDocument document)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return activities.sendDocument(document);
+    }
+
+    /** Detect if a mime type is acceptable downstream or not.  This method is used to determine whether it makes sense to fetch a document
+    * in the first place.
+    *@param mimeType is the mime type of the document.
+    *@return true if the mime type can be accepted by the downstream connection.
+    */
+    @Override
+    public boolean checkMimeTypeIndexable(String mimeType)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return activities.checkMimeTypeIndexable(mimeType);
+    }
+
+    /** Pre-determine whether a document (passed here as a File object) is acceptable downstream.  This method is
+    * used to determine whether a document needs to be actually transferred.  This hook is provided mainly to support
+    * search engines that only handle a small set of accepted file types.
+    *@param localFile is the local file to check.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkDocumentIndexable(File localFile)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return activities.checkDocumentIndexable(localFile);
+    }
+
+    /** Pre-determine whether a document's length is acceptable downstream.  This method is used
+    * to determine whether to fetch a document in the first place.
+    *@param length is the length of the document.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkLengthIndexable(long length)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return activities.checkLengthIndexable(length);
+    }
+
+    /** Pre-determine whether a document's URL is acceptable downstream.  This method is used
+    * to help filter out documents that cannot be indexed in advance.
+    *@param url is the URL of the document.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkURLIndexable(String url)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return activities.checkURLIndexable(url);
+    }
+
+  }
+  
 }

Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputConnectionManagerFactory.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputConnectionManagerFactory.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputConnectionManagerFactory.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputConnectionManagerFactory.java Fri Jun  6 12:35:33 2014
@@ -66,26 +66,23 @@ public class OutputConnectionManagerFact
   {
     IOutputConnectionManager manager = make(tc);
     IOutputConnection[] connections = manager.getAllConnections();
-    HashMap map = new HashMap();
-    int i = 0;
-    while (i < connections.length)
+    Set<String> map = new HashSet();
+    for (IOutputConnection connection : connections)
     {
-      IOutputConnection connection = connections[i++];
       String connectionName = connection.getName();
       String[] activities = OutputConnectorFactory.getActivitiesList(tc,connection.getClassName());
-      int j = 0;
-      while (j < activities.length)
+      for (String activityName : activities)
       {
-        String activity = activities[j++] + " ("+connectionName+")";
-        map.put(activity,activity);
+        String activity = ManifoldCF.qualifyOutputActivityName(activityName,connectionName);
+        map.add(activity);
       }
     }
     String[] rval = new String[map.size()];
-    i = 0;
-    Iterator iter = map.keySet().iterator();
+    int i = 0;
+    Iterator<String> iter = map.iterator();
     while (iter.hasNext())
     {
-      rval[i++] = (String)iter.next();
+      rval[i++] = iter.next();
     }
     java.util.Arrays.sort(rval);
     return rval;

Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/TransformationConnectionManagerFactory.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/TransformationConnectionManagerFactory.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/TransformationConnectionManagerFactory.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/TransformationConnectionManagerFactory.java Fri Jun  6 12:35:33 2014
@@ -73,7 +73,7 @@ public class TransformationConnectionMan
       String[] activities = TransformationConnectorFactory.getActivitiesList(tc,connection.getClassName());
       for (String baseActivity : activities)
       {
-        String activity = baseActivity + " ("+connectionName+")";
+        String activity = ManifoldCF.qualifyTransformationActivityName(baseActivity, connectionName);
         map.add(activity);
       }
     }

Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java Fri Jun  6 12:35:33 2014
@@ -171,6 +171,26 @@ public class ManifoldCF extends org.apac
     AgentManagerFactory.noteOutputConnectionChange(threadContext,connectionName);
   }
   
+  /** Qualify output activity name.
+  *@param outputActivityName is the name of the output activity.
+  *@param outputConnectionName is the corresponding name of the output connection.
+  *@return the qualified (global) activity name.
+  */
+  public static String qualifyOutputActivityName(String outputActivityName, String outputConnectionName)
+  {
+    return outputActivityName+" ("+outputConnectionName+")";
+  }
+
+  /** Qualify transformation activity name.
+  *@param transformationActivityName is the name of the output activity.
+  *@param transformationConnectionName is the corresponding name of the transformation connection.
+  *@return the qualified (global) activity name.
+  */
+  public static String qualifyTransformationActivityName(String transformationActivityName, String transformationConnectionName)
+  {
+    return transformationActivityName+" ["+transformationConnectionName+"]";
+  }
+
   // Helper methods for API support.  These are made public so connectors can use them to implement the executeCommand method.
   
   // These are the universal node types.

Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Fri Jun  6 12:35:33 2014
@@ -1123,16 +1123,6 @@ public class ManifoldCF extends org.apac
 
   }
 
-  /** Qualify output activity name.
-  *@param outputActivityName is the name of the output activity.
-  *@param outputConnectionName is the corresponding name of the output connection.
-  *@return the qualified (global) activity name.
-  */
-  public static String qualifyOutputActivityName(String outputActivityName, String outputConnectionName)
-  {
-    return outputActivityName+" ("+outputConnectionName+")";
-  }
-
   /** Get the activities list for a given repository connection.
   */
   public static String[] getActivitiesList(IThreadContext threadContext, String connectionName)
@@ -1143,31 +1133,35 @@ public class ManifoldCF extends org.apac
     if (thisConnection == null)
       return null;
     String[] outputActivityList = OutputConnectionManagerFactory.getAllOutputActivities(threadContext);
+    String[] transformationActivityList = TransformationConnectionManagerFactory.getAllTransformationActivities(threadContext);
     String[] connectorActivityList = RepositoryConnectorFactory.getActivitiesList(threadContext,thisConnection.getClassName());
     String[] globalActivityList = IRepositoryConnectionManager.activitySet;
-    String[] activityList = new String[outputActivityList.length + ((connectorActivityList==null)?0:connectorActivityList.length) + globalActivityList.length];
+    String[] activityList = new String[transformationActivityList.length + outputActivityList.length + ((connectorActivityList==null)?0:connectorActivityList.length) + globalActivityList.length];
     int k2 = 0;
-    int j;
+    if (transformationActivityList != null)
+    {
+      for (String transformationActivity: transformationActivityList)
+      {
+        activityList[k2++] = transformationActivity;
+      }
+    }
     if (outputActivityList != null)
     {
-      j = 0;
-      while (j < outputActivityList.length)
+      for (String outputActivity : outputActivityList)
       {
-        activityList[k2++] = outputActivityList[j++];
+        activityList[k2++] = outputActivity;
       }
     }
     if (connectorActivityList != null)
     {
-      j = 0;
-      while (j < connectorActivityList.length)
+      for (String connectorActivity : connectorActivityList)
       {
-        activityList[k2++] = connectorActivityList[j++];
+        activityList[k2++] = connectorActivity;
       }
     }
-    j = 0;
-    while (j < globalActivityList.length)
+    for (String globalActivity : globalActivityList)
     {
-      activityList[k2++] = globalActivityList[j++];
+      activityList[k2++] = globalActivity;
     }
     java.util.Arrays.sort(activityList);
     return activityList;

Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Fri Jun  6 12:35:33 2014
@@ -160,7 +160,7 @@ public class WorkerThread extends Thread
 
             IRepositoryConnection connection = qds.getConnection();
             
-            OutputActivity ingestLogger = new OutputActivity(connectionName,connMgr,outputName);
+            OutputActivity ingestLogger = new OutputActivity(connectionName,connMgr);
 
             // The flow through this section of the code is as follows.
             // (1) We start with a list of documents
@@ -315,16 +315,18 @@ public class WorkerThread extends Thread
                       }
                     }
 
-                    // Get the output version string.
+                    // Get the output version string. Cannot be null.
                     String outputVersion = ingester.getOutputDescription(outputName,outputSpec);
-                    // Get the transformation version strings.
+                    // Get the transformation version strings.  Cannot be null.
                     String[] transformationVersions = ingester.getTransformationDescriptions(transformationNames,transformationSpecifications);
                     
                     Set<String> abortSet = new HashSet<String>();
-                    VersionActivity versionActivity = new VersionActivity(job.getID(),processID,connectionName,outputName,transformationNames,connMgr,jobManager,ingester,abortSet,outputVersion,transformationVersions);
+                    VersionActivity versionActivity = new VersionActivity(job.getID(),processID,connectionName,outputName,transformationNames,connMgr,jobManager,ingester,abortSet,outputVersion,transformationVersions,ingestLogger);
 
                     String aclAuthority = connection.getACLAuthority();
-                    boolean isDefaultAuthority = (aclAuthority == null || aclAuthority.length() == 0);
+                    if (aclAuthority == null)
+                      aclAuthority = "";
+                    boolean isDefaultAuthority = (aclAuthority.length() == 0);
 
                     if (Logging.threads.isDebugEnabled())
                       Logging.threads.debug("Worker thread getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
@@ -400,10 +402,6 @@ public class WorkerThread extends Thread
                       // This try{ } is for releasing document versions at the connector level.
                       try
                       {
-                        // Organize what we need for document status comparison, and get it into a canonical form.
-                        String newOutputVersion = outputVersion;
-                        if (newOutputVersion == null)
-                          newOutputVersion = "";
 
                         // Loop through documents now, and amass what we need to fetch.
                         // We also need to tally: (1) what needs to be marked as deleted via
@@ -429,9 +427,6 @@ public class WorkerThread extends Thread
                             DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
                             String documentIDHash = dd.getDocumentIdentifierHash();
                             String newDocVersion = newVersionStringArray[i];
-                            String newAuthorityName = aclAuthority;
-                            if (newAuthorityName == null)
-                              newAuthorityName = "";
 
                             versionMap.put(dd.getDocumentIdentifierHash(),newDocVersion);
 
@@ -458,17 +453,11 @@ public class WorkerThread extends Thread
                                 // that was there before is there now (which may mean a rescan),
                                 // or (2) there are different versions (which ALWAYS means a rescan).
                                 String oldDocVersion = oldDocStatus.getDocumentVersion();
-                                if (oldDocVersion == null)
-                                  oldDocVersion = "";
                                 String oldAuthorityName = oldDocStatus.getDocumentAuthorityNameString();
-                                if (oldAuthorityName == null)
-                                  oldAuthorityName = "";
                                 String oldOutputVersion = oldDocStatus.getOutputVersion();
-                                if (oldOutputVersion == null)
-                                  oldOutputVersion = "";
                                 String oldParameterVersion = oldDocStatus.getParameterVersion();
-                                if (oldParameterVersion == null)
-                                  oldParameterVersion = "";
+                                String[] oldTransformationNames = oldDocStatus.getTransformationNameStrings();
+                                String[] oldTransformationVersions = oldDocStatus.getTransformationVersions();
 
                                 // Start the comparison processing
                                 if (newDocVersion.length() == 0)
@@ -477,9 +466,10 @@ public class WorkerThread extends Thread
                                   allowIngest = true;
                                 }
                                 else if (oldDocVersion.equals(newDocVersion) &&
-                                  oldAuthorityName.equals(newAuthorityName) &&
-                                  oldOutputVersion.equals(newOutputVersion) &&
-                                  oldParameterVersion.equals(newParameterVersion))
+                                  oldAuthorityName.equals(aclAuthority) &&
+                                  oldOutputVersion.equals(outputVersion) &&
+                                  oldParameterVersion.equals(newParameterVersion) &&
+                                  compareTransformations(oldTransformationNames,oldTransformationVersions,transformationNames,transformationVersions))
                                 {
                                   // The old logic was as follows:
                                   //
@@ -1247,6 +1237,20 @@ public class WorkerThread extends Thread
     sb.append(delim);
   }
 
+  protected static boolean compareTransformations(String[] oldTransformationNames, String[] oldTransformationVersions,
+    String[] transformationNames, String[] transformationVersions)
+  {
+    if (oldTransformationNames.length != transformationNames.length)
+      return false;
+    for (int i = 0; i < oldTransformationNames.length; i++)
+    {
+      if (!oldTransformationNames[i].equals(transformationNames[i]) ||
+        !oldTransformationVersions[i].equals(transformationVersions[i]))
+        return false;
+    }
+    return true;
+  }
+
   /** The maximum number of adds that happen in a single transaction */
   protected static final int MAX_ADDS_IN_TRANSACTION = 20;
 
@@ -1267,7 +1271,7 @@ public class WorkerThread extends Thread
     protected final Set<String> abortSet;
     protected final String outputVersion;
     protected final String[] transformationVersions;
-
+    protected final CheckActivity checkActivity;
     /** Constructor.
     */
     public VersionActivity(Long jobID, String processID,
@@ -1275,7 +1279,8 @@ public class WorkerThread extends Thread
       String[] transformationConnectionNames,
       IRepositoryConnectionManager connMgr,
       IJobManager jobManager, IIncrementalIngester ingester, Set<String> abortSet,
-      String outputVersion, String[] transformationVersions)
+      String outputVersion, String[] transformationVersions,
+      CheckActivity checkActivity)
     {
       this.jobID = jobID;
       this.processID = processID;
@@ -1288,6 +1293,7 @@ public class WorkerThread extends Thread
       this.abortSet = abortSet;
       this.outputVersion = outputVersion;
       this.transformationVersions = transformationVersions;
+      this.checkActivity = checkActivity;
     }
 
     /** Check whether a mime type is indexable by the currently specified output connector.
@@ -1298,8 +1304,10 @@ public class WorkerThread extends Thread
     public boolean checkMimeTypeIndexable(String mimeType)
       throws ManifoldCFException, ServiceInterruption
     {
-      // MHL
-      return ingester.checkMimeTypeIndexable(outputConnectionName,outputVersion,mimeType);
+      return ingester.checkMimeTypeIndexable(
+        transformationConnectionNames,transformationVersions,
+        outputConnectionName,outputVersion,mimeType,
+        checkActivity);
     }
 
     /** Check whether a document is indexable by the currently specified output connector.
@@ -1310,8 +1318,10 @@ public class WorkerThread extends Thread
     public boolean checkDocumentIndexable(File localFile)
       throws ManifoldCFException, ServiceInterruption
     {
-      // MHL
-      return ingester.checkDocumentIndexable(outputConnectionName,outputVersion,localFile);
+      return ingester.checkDocumentIndexable(
+        transformationConnectionNames,transformationVersions,
+        outputConnectionName,outputVersion,localFile,
+        checkActivity);
     }
 
     /** Check whether a document of a specified length is indexable by the currently specified output connector.
@@ -1322,8 +1332,10 @@ public class WorkerThread extends Thread
     public boolean checkLengthIndexable(long length)
       throws ManifoldCFException, ServiceInterruption
     {
-      // MHL
-      return ingester.checkLengthIndexable(outputConnectionName,outputVersion,length);
+      return ingester.checkLengthIndexable(
+        transformationConnectionNames,transformationVersions,
+        outputConnectionName,outputVersion,length,
+        checkActivity);
     }
 
     /** Pre-determine whether a document's URL is indexable by this connector.  This method is used by participating repository connectors
@@ -1335,8 +1347,10 @@ public class WorkerThread extends Thread
     public boolean checkURLIndexable(String url)
       throws ManifoldCFException, ServiceInterruption
     {
-      // MHL
-      return ingester.checkURLIndexable(outputConnectionName,outputVersion,url);
+      return ingester.checkURLIndexable(
+        transformationConnectionNames,transformationVersions,
+        outputConnectionName,outputVersion,url,
+        checkActivity);
     }
 
     /** Record time-stamped information about the activity of the connector.
@@ -1487,7 +1501,7 @@ public class WorkerThread extends Thread
     protected final IIncrementalIngester ingester;
     protected final String connectionName;
     protected final String outputName;
-    protected final String[] transformationNames;
+    protected final String[] transformationConnectionNames;
     protected final long currentTime;
     protected final Long expireInterval;
     protected final Map<String,Set<String>> forcedMetadata;
@@ -1525,7 +1539,7 @@ public class WorkerThread extends Thread
       IThreadContext threadContext,
       IReprioritizationTracker rt, IJobManager jobManager,
       IIncrementalIngester ingester,
-      String connectionName, String outputName, String[] transformationNames,
+      String connectionName, String outputName, String[] transformationConnectionNames,
       long currentTime,
       Long expireInterval,
       Map<String,Set<String>> forcedMetadata,
@@ -1545,7 +1559,7 @@ public class WorkerThread extends Thread
       this.ingester = ingester;
       this.connectionName = connectionName;
       this.outputName = outputName;
-      this.transformationNames = transformationNames;
+      this.transformationConnectionNames = transformationConnectionNames;
       this.currentTime = currentTime;
       this.expireInterval = expireInterval;
       this.forcedMetadata = forcedMetadata;
@@ -1818,7 +1832,7 @@ public class WorkerThread extends Thread
       }
         
       // First, we need to add into the metadata the stuff from the job description.
-      ingester.documentIngest(transformationNames,
+      ingester.documentIngest(transformationConnectionNames,
         outputName,
         connectionName,documentIdentifierHash,
         version,transformationVersions,outputVersion,parameterVersion,
@@ -2171,8 +2185,10 @@ public class WorkerThread extends Thread
     public boolean checkMimeTypeIndexable(String mimeType)
       throws ManifoldCFException, ServiceInterruption
     {
-      // MHL
-      return ingester.checkMimeTypeIndexable(outputName,outputVersion,mimeType);
+      return ingester.checkMimeTypeIndexable(
+        transformationConnectionNames,transformationVersions,
+        outputName,outputVersion,mimeType,
+        ingestLogger);
     }
 
     /** Check whether a document is indexable by the currently specified output connector.
@@ -2183,8 +2199,10 @@ public class WorkerThread extends Thread
     public boolean checkDocumentIndexable(File localFile)
       throws ManifoldCFException, ServiceInterruption
     {
-      // MHL
-      return ingester.checkDocumentIndexable(outputName,outputVersion,localFile);
+      return ingester.checkDocumentIndexable(
+        transformationConnectionNames,transformationVersions,
+        outputName,outputVersion,localFile,
+        ingestLogger);
     }
 
     /** Check whether a document of a specified length is indexable by the currently specified output connector.
@@ -2195,8 +2213,10 @@ public class WorkerThread extends Thread
     public boolean checkLengthIndexable(long length)
       throws ManifoldCFException, ServiceInterruption
     {
-      // MHL
-      return ingester.checkLengthIndexable(outputName,outputVersion,length);
+      return ingester.checkLengthIndexable(
+        transformationConnectionNames,transformationVersions,
+        outputName,outputVersion,length,
+        ingestLogger);
     }
 
     /** Pre-determine whether a document's URL is indexable by this connector.  This method is used by participating repository connectors
@@ -2208,7 +2228,10 @@ public class WorkerThread extends Thread
     public boolean checkURLIndexable(String url)
       throws ManifoldCFException, ServiceInterruption
     {
-      return ingester.checkURLIndexable(outputName,outputVersion,url);
+      return ingester.checkURLIndexable(
+        transformationConnectionNames,transformationVersions,
+        outputName,outputVersion,url,
+        ingestLogger);
     }
 
     /** Create a global string from a simple string.
@@ -2506,23 +2529,78 @@ public class WorkerThread extends Thread
     }
   }
 
+  /** The check activity class */
+  protected static class CheckActivity implements IOutputCheckActivity
+  {
+    public CheckActivity()
+    {
+    }
+
+    /** Detect if a mime type is acceptable downstream or not.  This method is used to determine whether it makes sense to fetch a document
+    * in the first place.
+    *@param mimeType is the mime type of the document.
+    *@return true if the mime type can be accepted by the downstream connection.
+    */
+    @Override
+    public boolean checkMimeTypeIndexable(String mimeType)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return false;
+    }
+
+    /** Pre-determine whether a document (passed here as a File object) is acceptable downstream.  This method is
+    * used to determine whether a document needs to be actually transferred.  This hook is provided mainly to support
+    * search engines that only handle a small set of accepted file types.
+    *@param localFile is the local file to check.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkDocumentIndexable(File localFile)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return false;
+    }
+
+    /** Pre-determine whether a document's length is acceptable downstream.  This method is used
+    * to determine whether to fetch a document in the first place.
+    *@param length is the length of the document.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkLengthIndexable(long length)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return false;
+    }
+
+    /** Pre-determine whether a document's URL is acceptable downstream.  This method is used
+    * to help filter out documents that cannot be indexed in advance.
+    *@param url is the URL of the document.
+    *@return true if the file is acceptable by the downstream connection.
+    */
+    @Override
+    public boolean checkURLIndexable(String url)
+      throws ManifoldCFException, ServiceInterruption
+    {
+      return false;
+    }
+    
+  }
+  
   /** The ingest logger class */
-  protected static class OutputActivity implements IOutputActivity
+  protected static class OutputActivity extends CheckActivity implements IOutputActivity
   {
 
     // Connection name
-    protected String connectionName;
+    protected final String connectionName;
     // Connection manager
-    protected IRepositoryConnectionManager connMgr;
-    // Output connection name
-    protected String outputConnectionName;
+    protected final IRepositoryConnectionManager connMgr;
 
     /** Constructor */
-    public OutputActivity(String connectionName, IRepositoryConnectionManager connMgr, String outputConnectionName)
+    public OutputActivity(String connectionName, IRepositoryConnectionManager connMgr)
     {
       this.connectionName = connectionName;
       this.connMgr = connMgr;
-      this.outputConnectionName = outputConnectionName;
     }
 
     /** Record time-stamped information about the activity of the output connector.
@@ -2540,11 +2618,12 @@ public class WorkerThread extends Thread
     *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
     *       described in the resultCode field.  This field is not meant to be queried on.  May be null.
     */
+    @Override
     public void recordActivity(Long startTime, String activityType, Long dataSize,
       String entityURI, String resultCode, String resultDescription)
       throws ManifoldCFException
     {
-      connMgr.recordHistory(connectionName,startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),dataSize,entityURI,resultCode,
+      connMgr.recordHistory(connectionName,startTime,activityType,dataSize,entityURI,resultCode,
         resultDescription,null);
     }
 
@@ -2554,6 +2633,7 @@ public class WorkerThread extends Thread
     *@param accessToken is the raw, repository access token.
     *@return the properly qualified access token.
     */
+    @Override
     public String qualifyAccessToken(String authorityNameString, String accessToken)
       throws ManifoldCFException
     {
@@ -2567,6 +2647,7 @@ public class WorkerThread extends Thread
     *@param document is the document data to be processed (handed to the output data store).
     *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
     */
+    @Override
     public int sendDocument(RepositoryDocument document)
       throws ManifoldCFException, ServiceInterruption
     {
@@ -2574,51 +2655,6 @@ public class WorkerThread extends Thread
       return IPipelineConnector.DOCUMENTSTATUS_REJECTED;
     }
 
-    /** Detect if a mime type is acceptable downstream or not.  This method is used to determine whether it makes sense to fetch a document
-    * in the first place.
-    *@param mimeType is the mime type of the document.
-    *@return true if the mime type can be accepted by the downstream connection.
-    */
-    public boolean checkMimeTypeIndexable(String mimeType)
-      throws ManifoldCFException, ServiceInterruption
-    {
-      return false;
-    }
-
-    /** Pre-determine whether a document (passed here as a File object) is acceptable downstream.  This method is
-    * used to determine whether a document needs to be actually transferred.  This hook is provided mainly to support
-    * search engines that only handle a small set of accepted file types.
-    *@param localFile is the local file to check.
-    *@return true if the file is acceptable by the downstream connection.
-    */
-    public boolean checkDocumentIndexable(File localFile)
-      throws ManifoldCFException, ServiceInterruption
-    {
-      return false;
-    }
-
-    /** Pre-determine whether a document's length is acceptable downstream.  This method is used
-    * to determine whether to fetch a document in the first place.
-    *@param length is the length of the document.
-    *@return true if the file is acceptable by the downstream connection.
-    */
-    public boolean checkLengthIndexable(long length)
-      throws ManifoldCFException, ServiceInterruption
-    {
-      return false;
-    }
-
-    /** Pre-determine whether a document's URL is acceptable downstream.  This method is used
-    * to help filter out documents that cannot be indexed in advance.
-    *@param url is the URL of the document.
-    *@return true if the file is acceptable by the downstream connection.
-    */
-    public boolean checkURLIndexable(String url)
-      throws ManifoldCFException, ServiceInterruption
-    {
-      return false;
-    }
-
   }
 
 }