You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/15 13:49:19 UTC

svn commit: r1602680 [4/4] - in /manifoldcf/trunk: ./ framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/ framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/ framework/pull-agent/src/main/java/org/apache/m...

Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1602680&r1=1602679&r2=1602680&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Sun Jun 15 11:49:19 2014
@@ -142,20 +142,16 @@ public class WorkerThread extends Thread
             if (Logging.threads.isDebugEnabled())
               Logging.threads.debug("Worker thread received "+Integer.toString(qds.getCount())+" documents");
 
-            // Universal data, from the job
+            // Build a basic pipeline specification right off; we need it whenever
+            // we interact with Incremental Ingester.
+            IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
+            String lastIndexedOutputConnectionName = ingester.getLastIndexedOutputConnectionName(pipelineSpecificationBasic);
+            // Compute a parameter version string for all documents in this job
+            String newParameterVersion = packParameters(job.getForcedMetadata());
+
+            // Universal job data we'll need later
             String connectionName = job.getConnectionName();
-            String outputName = job.getOutputConnectionName();
-            int pipelineCount = job.countPipelineStages();
-            String[] transformationNames = new String[pipelineCount];
-            OutputSpecification[] transformationSpecifications = new OutputSpecification[pipelineCount];
-            for (int k = 0; k < pipelineCount; k++)
-            {
-              transformationNames[k] = job.getPipelineStageConnectionName(k);
-              transformationSpecifications[k] = job.getPipelineStageSpecification(k);
-            }
-            
             DocumentSpecification spec = job.getSpecification();
-            OutputSpecification outputSpec = job.getOutputSpecification();
             int jobType = job.getType();
 
             IRepositoryConnection connection = qds.getConnection();
@@ -297,6 +293,14 @@ public class WorkerThread extends Thread
                     // === Fetch document versions ===
                     String[] currentDocIDHashArray = new String[activeDocuments.size()];
                     String[] currentDocIDArray = new String[activeDocuments.size()];
+                    // We used to feed the old document version back to the repository connector so that it could
+                    // make decisions about whether to fetch, or just to call documentRecord().  The problem in a
+                    // multi-output world is that we may have had an error, and successfully output a document to
+                    // some outputs but not to others.  But we do this in a specific order.  It should be always safe
+                    // to get the document version from the *last* output in the sequence.  The problem is, we need
+                    // to be able to figure out what that is, and it is currently an implementation detail of
+                    // IncrementalIngester.  We solve this by allowing IncrementalIngester to make the decision.
+
                     String[] oldVersionStringArray = new String[activeDocuments.size()];
 
                     for (int i = 0; i < activeDocuments.size(); i++)
@@ -304,7 +308,7 @@ public class WorkerThread extends Thread
                       QueuedDocument qd = activeDocuments.get(i);
                       currentDocIDHashArray[i] = qd.getDocumentDescription().getDocumentIdentifierHash();
                       currentDocIDArray[i] = qd.getDocumentDescription().getDocumentIdentifier();
-                      DocumentIngestStatus dis = qd.getLastIngestedStatus();
+                      DocumentIngestStatus dis = qd.getLastIngestedStatus(lastIndexedOutputConnectionName);
                       if (dis == null)
                         oldVersionStringArray[i] = null;
                       else
@@ -315,18 +319,11 @@ public class WorkerThread extends Thread
                       }
                     }
 
-                    // Get the output version string. Cannot be null.
-                    String outputDescriptionString = ingester.getOutputDescription(outputName,outputSpec);
-                    // Get the transformation version strings.  Cannot be null.
-                    String[] transformationDescriptionStrings = ingester.getTransformationDescriptions(transformationNames,transformationSpecifications);
+                    // Create a full PipelineSpecification, including description strings.  (This is per-job still, but can throw ServiceInterruptions, so we do it in here.)
+                    IPipelineSpecification pipelineSpecification = new PipelineSpecification(pipelineSpecificationBasic,job,ingester);
                     
-                    // New version strings
-                    String newOutputVersion = outputDescriptionString;
-                    String newParameterVersion = packParameters(job.getForcedMetadata());
-                    String newTransformationVersion = packTransformations(transformationNames,transformationDescriptionStrings);
-
                     Set<String> abortSet = new HashSet<String>();
-                    VersionActivity versionActivity = new VersionActivity(job.getID(),processID,connectionName,outputName,transformationNames,connMgr,jobManager,ingester,abortSet,outputDescriptionString,transformationDescriptionStrings,ingestLogger);
+                    VersionActivity versionActivity = new VersionActivity(job.getID(),processID,connectionName,pipelineSpecification,connMgr,jobManager,ingester,abortSet,ingestLogger);
 
                     String aclAuthority = connection.getACLAuthority();
                     if (aclAuthority == null)
@@ -429,11 +426,12 @@ public class WorkerThread extends Thread
                           }
                           else
                           {
-                            DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
+                            // Compare against old version.
+                            // We call the incremental ingester to make the decision for us as to whether we refetch a document or not.
+                            
                             String documentIDHash = dd.getDocumentIdentifierHash();
                             String newDocVersion = newVersionStringArray[i];
-
-                            versionMap.put(dd.getDocumentIdentifierHash(),newDocVersion);
+                            versionMap.put(documentIDHash,newDocVersion);
 
                             if (newDocVersion == null)
                             {
@@ -445,63 +443,11 @@ public class WorkerThread extends Thread
                               finishList.add(qd);
 
                               // See if we need to add, or update.
-                              boolean allowIngest = false;
-                              if (oldDocStatus == null)
-                              {
-                                // Add
-                                allowIngest = true;
-                                // Fall through to allow the processing
-                              }
-                              else
-                              {
-                                // Update.  There are two possibilities here.  (1) the same version
-                                // that was there before is there now (which may mean a rescan),
-                                // or (2) there are different versions (which ALWAYS means a rescan).
-                                String oldDocVersion = oldDocStatus.getDocumentVersion();
-                                String oldAuthorityName = oldDocStatus.getDocumentAuthorityNameString();
-                                String oldOutputVersion = oldDocStatus.getOutputVersion();
-                                String oldTransformationVersion = oldDocStatus.getTransformationVersion();
-                                String oldParameterVersion = oldDocStatus.getParameterVersion();
-
-                                // Start the comparison processing
-                                if (newDocVersion.length() == 0)
-                                {
-                                  // Always reingest
-                                  allowIngest = true;
-                                }
-                                else if (oldDocVersion.equals(newDocVersion) &&
-                                  oldAuthorityName.equals(aclAuthority) &&
-                                  oldOutputVersion.equals(newOutputVersion) &&
-                                  oldTransformationVersion.equals(newTransformationVersion) &&
-                                  oldParameterVersion.equals(newParameterVersion))
-                                {
-                                  // The old logic was as follows:
-                                  //
-                                  // If the crawl is an incremental crawl, then we do NOT add this
-                                  // document to the fetch list, even for scanning and no ingestion.
-                                  // But we *do* add it, scan only, if this was a "full crawl".
-                                  //
-                                  // Apparently this was designed to prevent a document that had
-                                  // already been processed and had queued stuff from causing deletions
-                                  // under 'full scan' conditions, because those child documents would
-                                  // not be requeued then.  This contrasts with the incremental case,
-                                  // where we really don't want to refetch the document simply to find
-                                  // children - or do we?  The connector has to make that decision, it
-                                  // seems to me.  If it's the kind of document that might have children,
-                                  // then rescanning is warranted under ANY conditions; if it's not,
-                                  // then the connector can decide to just do nothing.
-                                  //
-                                  // For the kinds of connectors where all documents have children,
-                                  // preventing the fetch is not likely to help much.  These kinds of
-                                  // connectors (rss and web) depend on the document checksum to
-                                  // determine version anyway, so the document is fetched regardless.
-                                  // At least we prevent the ingestion.
-
-                                  // Fall through to allow the scanning, but not the ingest
-                                }
-                                else
-                                  allowIngest = true;
-                              }
+                              IPipelineSpecificationWithVersions specWithVersions = new PipelineSpecificationWithVersions(pipelineSpecification,qd);
+                              boolean allowIngest = ingester.checkFetchDocument(specWithVersions,
+                                newDocVersion,
+                                newParameterVersion,
+                                aclAuthority);
 
                               fetchList.add(new DocumentToProcess(qd,!allowIngest));
                               if (!allowIngest)
@@ -527,14 +473,24 @@ public class WorkerThread extends Thread
                             checkClasses[i] = connectionName;
                             checkIDs[i] = ingesterCheckList.get(i);
                           }
-                          ingester.documentCheckMultiple(outputName,checkClasses,checkIDs,currentTime);
+                          ingester.documentCheckMultiple(pipelineSpecificationBasic,checkClasses,checkIDs,currentTime);
                         }
 
                         // First, make the things we will need for all subsequent steps.
+                        // We need first to assemble an IPipelineSpecificationWithVersions object for each document we're going to process.
+                        // We put this in a map so it can be looked up by document identifier.
+                        Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications = new HashMap<String,IPipelineSpecificationWithVersions>();
+                        for (int i = 0; i < fetchList.size(); i++)
+                        {
+                          QueuedDocument qd = fetchList.get(i).getDocument();
+                          fetchPipelineSpecifications.put(qd.getDocumentDescription().getDocumentIdentifierHash(),
+                            new PipelineSpecificationWithVersions(pipelineSpecification,qd));
+                        }
+                        
                         ProcessActivity activity = new ProcessActivity(job.getID(),processID,
                           threadContext,rt,jobManager,ingester,
-                          connectionName,outputName,transformationNames,
-                          outputDescriptionString,transformationDescriptionStrings,
+                          connectionName,pipelineSpecification,
+                          fetchPipelineSpecifications,
                           currentTime,
                           job.getExpiration(),
                           job.getForcedMetadata(),
@@ -542,7 +498,7 @@ public class WorkerThread extends Thread
                           job.getMaxInterval(),
                           job.getHopcountMode(),
                           connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,
-                          newOutputVersion,newTransformationVersion,newParameterVersion);
+                          newParameterVersion);
                         try
                         {
 
@@ -679,7 +635,7 @@ public class WorkerThread extends Thread
                                   timeIDClasses[i] = connectionName;
                                   timeIDHashes[i] = documentIDHash;
                                 }
-                                long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(outputName,timeIDClasses,timeIDHashes);
+                                long[] timeArray = ingester.getDocumentUpdateIntervalMultiple(pipelineSpecificationBasic,timeIDClasses,timeIDHashes);
                                 Long[] recheckTimeArray = new Long[timeArray.length];
                                 int[] actionArray = new int[timeArray.length];
                                 DocumentDescription[] recrawlDocs = new DocumentDescription[finishList.size()];
@@ -832,12 +788,12 @@ public class WorkerThread extends Thread
                   }
                   
                   // Now, handle the delete list
-                  processDeleteLists(outputName,connector,connection,jobManager,
+                  processDeleteLists(pipelineSpecificationBasic,connector,connection,jobManager,
                     deleteList,ingester,
                     job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
                   // Handle hopcount removal
-                  processHopcountRemovalLists(outputName,connector,connection,jobManager,
+                  processHopcountRemovalLists(pipelineSpecificationBasic,connector,connection,jobManager,
                     hopcountremoveList,ingester,
                     job.getID(),legalLinkTypes,ingestLogger,job.getHopcountMode(),rt,currentTime);
 
@@ -989,7 +945,8 @@ public class WorkerThread extends Thread
   * of what the deletion method must do.  Specifically, it should be capable of deleting
   * documents from the index should they be already present.
   */
-  protected static void processHopcountRemovalLists(String outputName, IRepositoryConnector connector,
+  protected static void processHopcountRemovalLists(IPipelineSpecificationBasic pipelineSpecificationBasic,
+    IRepositoryConnector connector,
     IRepositoryConnection connection, IJobManager jobManager,
     List<QueuedDocument> hopcountremoveList,
     IIncrementalIngester ingester,
@@ -998,20 +955,21 @@ public class WorkerThread extends Thread
     throws ManifoldCFException
   {
     // Remove from index
-    hopcountremoveList = removeFromIndex(outputName,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
+    hopcountremoveList = removeFromIndex(pipelineSpecificationBasic,connection.getName(),jobManager,hopcountremoveList,ingester,ingestLogger);
     // Mark as 'hopcountremoved' in the job queue
     processJobQueueHopcountRemovals(hopcountremoveList,connector,connection,
       jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
   }
 
   /** Clear specified documents out of the job queue and from the appliance.
-  *@param outputName is the output connection name.
+  *@param pipelineSpecificationBasic is the basic pipeline specification for this job.
   *@param jobManager is the job manager.
   *@param deleteList is a list of QueuedDocument objects to clean out.
   *@param ingester is the handle to the incremental ingestion API control object.
   *@param ingesterDeleteList is a list of document id's to delete.
   */
-  protected static void processDeleteLists(String outputName, IRepositoryConnector connector,
+  protected static void processDeleteLists(IPipelineSpecificationBasic pipelineSpecificationBasic,
+    IRepositoryConnector connector,
     IRepositoryConnection connection, IJobManager jobManager,
     List<QueuedDocument> deleteList,
     IIncrementalIngester ingester,
@@ -1020,7 +978,7 @@ public class WorkerThread extends Thread
     throws ManifoldCFException
   {
     // Remove from index
-    deleteList = removeFromIndex(outputName,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
+    deleteList = removeFromIndex(pipelineSpecificationBasic,connection.getName(),jobManager,deleteList,ingester,ingestLogger);
     // Delete from the job queue
     processJobQueueDeletions(deleteList,connector,connection,
       jobManager,jobID,legalLinkTypes,hopcountMethod,rt,currentTime);
@@ -1029,7 +987,7 @@ public class WorkerThread extends Thread
   /** Remove a specified set of documents from the index.
   *@return the list of documents whose state needs to be updated in jobqueue.
   */
-  protected static List<QueuedDocument> removeFromIndex(String outputName,
+  protected static List<QueuedDocument> removeFromIndex(IPipelineSpecificationBasic pipelineSpecificationBasic,
     String connectionName, IJobManager jobManager, List<QueuedDocument> deleteList, 
     IIncrementalIngester ingester, OutputActivity ingestLogger)
     throws ManifoldCFException
@@ -1038,9 +996,8 @@ public class WorkerThread extends Thread
     for (int i = 0; i < deleteList.size(); i++)
     {
       QueuedDocument qd = deleteList.get(i);
-      DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
       // See if we need to delete from index
-      if (oldDocStatus != null)
+      if (qd.anyLastIngestedRecords())
       {
         // Queue up to issue deletion
         ingesterDeleteList.add(qd.getDocumentDescription().getDocumentIdentifierHash());
@@ -1062,7 +1019,7 @@ public class WorkerThread extends Thread
       // Try to delete the documents via the output connection.
       try
       {
-        ingester.documentDeleteMultiple(outputName,deleteClasses,deleteIDs,ingestLogger);
+        ingester.documentDeleteMultiple(pipelineSpecificationBasic,deleteClasses,deleteIDs,ingestLogger);
       }
       catch (ServiceInterruption e)
       {
@@ -1273,36 +1230,28 @@ public class WorkerThread extends Thread
     protected final Long jobID;
     protected final String processID;
     protected final String connectionName;
-    protected final String outputConnectionName;
-    protected final String[] transformationConnectionNames;
+    protected final IPipelineSpecification pipelineSpecification;
     protected final IRepositoryConnectionManager connMgr;
     protected final IJobManager jobManager;
     protected final IIncrementalIngester ingester;
     protected final Set<String> abortSet;
-    protected final String outputDescriptionString;
-    protected final String[] transformationDescriptionStrings;
     protected final CheckActivity checkActivity;
     /** Constructor.
     */
     public VersionActivity(Long jobID, String processID,
-      String connectionName, String outputConnectionName,
-      String[] transformationConnectionNames,
+      String connectionName, IPipelineSpecification pipelineSpecification,
       IRepositoryConnectionManager connMgr,
       IJobManager jobManager, IIncrementalIngester ingester, Set<String> abortSet,
-      String outputDescriptionString, String[] transformationDescriptionStrings,
       CheckActivity checkActivity)
     {
       this.jobID = jobID;
       this.processID = processID;
       this.connectionName = connectionName;
-      this.outputConnectionName = outputConnectionName;
-      this.transformationConnectionNames = transformationConnectionNames;
+      this.pipelineSpecification = pipelineSpecification;
       this.connMgr = connMgr;
       this.jobManager = jobManager;
       this.ingester = ingester;
       this.abortSet = abortSet;
-      this.outputDescriptionString = outputDescriptionString;
-      this.transformationDescriptionStrings = transformationDescriptionStrings;
       this.checkActivity = checkActivity;
     }
 
@@ -1315,8 +1264,8 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkMimeTypeIndexable(
-        transformationConnectionNames,transformationDescriptionStrings,
-        outputConnectionName,outputDescriptionString,mimeType,
+        pipelineSpecification,
+        mimeType,
         checkActivity);
     }
 
@@ -1329,8 +1278,8 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkDocumentIndexable(
-        transformationConnectionNames,transformationDescriptionStrings,
-        outputConnectionName,outputDescriptionString,localFile,
+        pipelineSpecification,
+        localFile,
         checkActivity);
     }
 
@@ -1343,8 +1292,8 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkLengthIndexable(
-        transformationConnectionNames,transformationDescriptionStrings,
-        outputConnectionName,outputDescriptionString,length,
+        pipelineSpecification,
+        length,
         checkActivity);
     }
 
@@ -1358,8 +1307,8 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkURLIndexable(
-        transformationConnectionNames,transformationDescriptionStrings,
-        outputConnectionName,outputDescriptionString,url,
+        pipelineSpecification,
+        url,
         checkActivity);
     }
 
@@ -1510,10 +1459,8 @@ public class WorkerThread extends Thread
     protected final IJobManager jobManager;
     protected final IIncrementalIngester ingester;
     protected final String connectionName;
-    protected final String outputName;
-    protected final String[] transformationConnectionNames;
-    protected final String outputDescriptionString;
-    protected final String[] transformationDescriptionStrings;
+    protected final IPipelineSpecification pipelineSpecification;
+    protected final Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications;
     protected final long currentTime;
     protected final Long expireInterval;
     protected final Map<String,Set<String>> forcedMetadata;
@@ -1527,8 +1474,6 @@ public class WorkerThread extends Thread
     protected final OutputActivity ingestLogger;
     protected final IReprioritizationTracker rt;
     protected final Set<String> abortSet;
-    protected final String outputVersion;
-    protected final String transformationVersion;
     protected final String parameterVersion;
     
     // We submit references in bulk, because that's way more efficient.
@@ -1551,8 +1496,9 @@ public class WorkerThread extends Thread
       IThreadContext threadContext,
       IReprioritizationTracker rt, IJobManager jobManager,
       IIncrementalIngester ingester,
-      String connectionName, String outputName, String[] transformationConnectionNames,
-      String outputDescriptionString, String[] transformationDescriptionStrings,
+      String connectionName,
+      IPipelineSpecification pipelineSpecification,
+      Map<String,IPipelineSpecificationWithVersions> fetchPipelineSpecifications,
       long currentTime,
       Long expireInterval,
       Map<String,Set<String>> forcedMetadata,
@@ -1562,7 +1508,7 @@ public class WorkerThread extends Thread
       IRepositoryConnection connection, IRepositoryConnector connector,
       IRepositoryConnectionManager connMgr, String[] legalLinkTypes, OutputActivity ingestLogger,
       Set<String> abortSet,
-      String outputVersion, String transformationVersion, String parameterVersion)
+      String parameterVersion)
     {
       this.jobID = jobID;
       this.processID = processID;
@@ -1571,10 +1517,8 @@ public class WorkerThread extends Thread
       this.jobManager = jobManager;
       this.ingester = ingester;
       this.connectionName = connectionName;
-      this.outputName = outputName;
-      this.outputDescriptionString = outputDescriptionString;
-      this.transformationConnectionNames = transformationConnectionNames;
-      this.transformationDescriptionStrings = transformationDescriptionStrings;
+      this.pipelineSpecification = pipelineSpecification;
+      this.fetchPipelineSpecifications = fetchPipelineSpecifications;
       this.currentTime = currentTime;
       this.expireInterval = expireInterval;
       this.forcedMetadata = forcedMetadata;
@@ -1587,9 +1531,7 @@ public class WorkerThread extends Thread
       this.legalLinkTypes = legalLinkTypes;
       this.ingestLogger = ingestLogger;
       this.abortSet = abortSet;
-      this.outputVersion = outputVersion;
       this.parameterVersion = parameterVersion;
-      this.transformationVersion = transformationVersion;
     }
 
     /** Clean up any dangling information, before abandoning this process activity object */
@@ -1805,7 +1747,10 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
-      ingester.documentRecord(outputName,connectionName,documentIdentifierHash,version,currentTime,ingestLogger);
+      ingester.documentRecord(
+        pipelineSpecification.getBasicPipelineSpecification(),
+        connectionName,documentIdentifierHash,
+        version,currentTime,ingestLogger);
     }
 
     /** Ingest the current document.
@@ -1843,6 +1788,7 @@ public class WorkerThread extends Thread
     *@param data is the document data.  The data is closed after ingestion is complete.
     *@throws IOException only when data stream reading fails.
     */
+    @Override
     public void ingestDocumentWithException(String documentIdentifier, String version, String documentURI, RepositoryDocument data)
       throws ManifoldCFException, ServiceInterruption, IOException
     {
@@ -1871,12 +1817,10 @@ public class WorkerThread extends Thread
       }
         
       // First, we need to add into the metadata the stuff from the job description.
-      ingester.documentIngest(transformationConnectionNames,
-        transformationDescriptionStrings,
-        outputName,
-        outputDescriptionString,
+      ingester.documentIngest(
+        fetchPipelineSpecifications.get(documentIdentifierHash),
         connectionName,documentIdentifierHash,
-        version,transformationVersion,outputVersion,parameterVersion,
+        version,parameterVersion,
         connection.getACLAuthority(),
         data,currentTime,
         documentURI,
@@ -1896,7 +1840,17 @@ public class WorkerThread extends Thread
       if (version.length() == 0)
         deleteDocument(documentIdentifier);
       else
-        ingestDocument(documentIdentifier,version,null,null);
+      {
+        try
+        {
+          ingestDocumentWithException(documentIdentifier,version,null,null);
+        }
+        catch (IOException e)
+        {
+          // Should never occur, since we passed in no data
+          throw new IllegalStateException("IngestDocumentWithException threw an illegal IOException: "+e.getMessage(),e);
+        }
+      }
     }
 
     /** Delete the current document from the search engine index.  This method does NOT keep track of version
@@ -1910,7 +1864,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       String documentIdentifierHash = ManifoldCF.hash(documentIdentifier);
-      ingester.documentDelete(outputName,
+      ingester.documentDelete(pipelineSpecification.getBasicPipelineSpecification(),
         connectionName,documentIdentifierHash,
         ingestLogger);
     }
@@ -2227,8 +2181,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkMimeTypeIndexable(
-        transformationConnectionNames,transformationDescriptionStrings,
-        outputName,outputDescriptionString,mimeType,
+        pipelineSpecification,mimeType,
         ingestLogger);
     }
 
@@ -2241,8 +2194,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkDocumentIndexable(
-        transformationConnectionNames,transformationDescriptionStrings,
-        outputName,outputDescriptionString,localFile,
+        pipelineSpecification,localFile,
         ingestLogger);
     }
 
@@ -2255,8 +2207,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkLengthIndexable(
-        transformationConnectionNames,transformationDescriptionStrings,
-        outputName,outputDescriptionString,length,
+        pipelineSpecification,length,
         ingestLogger);
     }
 
@@ -2270,8 +2221,7 @@ public class WorkerThread extends Thread
       throws ManifoldCFException, ServiceInterruption
     {
       return ingester.checkURLIndexable(
-        transformationConnectionNames,transformationDescriptionStrings,
-        outputName,outputDescriptionString,url,
+        pipelineSpecification,url,
         ingestLogger);
     }