You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/12/10 14:36:43 UTC
svn commit: r1644399 [1/2] - in /manifoldcf/trunk: ./
framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/
framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/
framework/agents/src/main/java/org/apache/manif...
Author: kwright
Date: Wed Dec 10 13:36:42 2014
New Revision: 1644399
URL: http://svn.apache.org/r1644399
Log:
Refactor IIncrementalIngester API further for efficiency and clarity.
Added:
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineConnections.java
- copied unchanged from r1644398, manifoldcf/branches/CONNECTORS-1118/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineConnections.java
Removed:
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnectionsWithVersions.java
Modified:
manifoldcf/trunk/ (props changed)
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnections.java
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java
manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 10 13:36:42 2014
@@ -44,7 +44,7 @@
/manifoldcf/branches/CONNECTORS-1089:1635610-1635937
/manifoldcf/branches/CONNECTORS-1100:1637693-1640317
/manifoldcf/branches/CONNECTORS-1104:1640149-1640198
-/manifoldcf/branches/CONNECTORS-1118:1644108-1644196
+/manifoldcf/branches/CONNECTORS-1118:1644108-1644398
/manifoldcf/branches/CONNECTORS-120:1406712-1407974,1407982-1411043,1411049-1416451
/manifoldcf/branches/CONNECTORS-120-1:1416450-1417056
/manifoldcf/branches/CONNECTORS-13:1525862-1527182,1539324-1541634
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Wed Dec 10 13:36:42 2014
@@ -83,8 +83,6 @@ public class IncrementalIngester extends
protected final IOutputConnectionManager connectionManager;
// Output connector pool manager
protected final IOutputConnectorPool outputConnectorPool;
- // Transformation connection manager
- protected final ITransformationConnectionManager transformationConnectionManager;
// Transformation connector pool manager
protected final ITransformationConnectorPool transformationConnectorPool;
@@ -98,7 +96,6 @@ public class IncrementalIngester extends
lockManager = LockManagerFactory.make(threadContext);
connectionManager = OutputConnectionManagerFactory.make(threadContext);
outputConnectorPool = OutputConnectorPoolFactory.make(threadContext);
- transformationConnectionManager = TransformationConnectionManagerFactory.make(threadContext);
transformationConnectorPool = TransformationConnectorPoolFactory.make(threadContext);
}
@@ -228,19 +225,19 @@ public class IncrementalIngester extends
}
/** Check if a date is indexable.
- *@param pipelineConnections is the IPipelineConnections object for this pipeline.
+ *@param pipelineSpecification is the IPipelineSpecification object for this pipeline.
*@param date is the date to check.
*@param activity are the activities available to this method.
*@return true if the mimeType is indexable.
*/
@Override
public boolean checkDateIndexable(
- IPipelineConnections pipelineConnections,
+ IPipelineSpecification pipelineSpecification,
Date date,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(pipelineConnections);
+ PipelineObject pipeline = pipelineGrab(pipelineSpecification);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -255,19 +252,19 @@ public class IncrementalIngester extends
}
/** Check if a mime type is indexable.
- *@param pipelineConnections is the pipeline connections object for this pipeline.
+ *@param pipelineSpecification is the IPipelineSpecification object for this pipeline.
*@param mimeType is the mime type to check.
*@param activity are the activities available to this method.
*@return true if the mimeType is indexable.
*/
@Override
public boolean checkMimeTypeIndexable(
- IPipelineConnections pipelineConnections,
+ IPipelineSpecification pipelineSpecification,
String mimeType,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(pipelineConnections);
+ PipelineObject pipeline = pipelineGrab(pipelineSpecification);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -282,19 +279,19 @@ public class IncrementalIngester extends
}
/** Check if a file is indexable.
- *@param pipelineConnections is the pipeline connections object for this pipeline.
+ *@param pipelineSpecification is the IPipelineSpecification object for this pipeline.
*@param localFile is the local file to check.
*@param activity are the activities available to this method.
*@return true if the local file is indexable.
*/
@Override
public boolean checkDocumentIndexable(
- IPipelineConnections pipelineConnections,
+ IPipelineSpecification pipelineSpecification,
File localFile,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(pipelineConnections);
+ PipelineObject pipeline = pipelineGrab(pipelineSpecification);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -310,19 +307,19 @@ public class IncrementalIngester extends
/** Pre-determine whether a document's length is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that are too long to be indexable.
- *@param pipelineConnections is the pipeline connections object for this pipeline.
+ *@param pipelineSpecification is the IPipelineSpecification object for this pipeline.
*@param length is the length of the document.
*@param activity are the activities available to this method.
*@return true if the file is indexable.
*/
@Override
public boolean checkLengthIndexable(
- IPipelineConnections pipelineConnections,
+ IPipelineSpecification pipelineSpecification,
long length,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(pipelineConnections);
+ PipelineObject pipeline = pipelineGrab(pipelineSpecification);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -338,19 +335,19 @@ public class IncrementalIngester extends
/** Pre-determine whether a document's URL is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that not indexable.
- *@param pipelineConnections is the pipeline connections object for this pipeline.
+ *@param pipelineSpecification is the IPipelineSpecification object for this pipeline.
*@param url is the url of the document.
*@param activity are the activities available to this method.
*@return true if the file is indexable.
*/
@Override
public boolean checkURLIndexable(
- IPipelineConnections pipelineConnections,
+ IPipelineSpecification pipelineSpecification,
String url,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(pipelineConnections);
+ PipelineObject pipeline = pipelineGrab(pipelineSpecification);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -371,7 +368,7 @@ public class IncrementalIngester extends
*@param outputDescriptionString - the output description string
*@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
*/
- protected PipelineObjectWithVersions pipelineGrabWithVersions(IPipelineConnectionsWithVersions pipelineConnections)
+ protected PipelineObjectWithVersions pipelineGrabWithVersions(IPipelineSpecificationWithVersions pipelineConnections)
throws ManifoldCFException
{
// Pick up all needed transformation connectors
@@ -421,7 +418,7 @@ public class IncrementalIngester extends
*@param outputDescriptionString - the output description string
*@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
*/
- protected PipelineObject pipelineGrab(IPipelineConnections pipelineConnections)
+ protected PipelineObject pipelineGrab(IPipelineSpecification pipelineConnections)
throws ManifoldCFException
{
// Pick up all needed transformation connectors
@@ -465,16 +462,15 @@ public class IncrementalIngester extends
}
/** Get an output version string for a document.
- *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param outputConnection is the output connection associated with this action.
*@param spec is the output specification.
*@return the description string.
*/
@Override
- public VersionContext getOutputDescription(String outputConnectionName, Specification spec)
+ public VersionContext getOutputDescription(IOutputConnection outputConnection, Specification spec)
throws ManifoldCFException, ServiceInterruption
{
- IOutputConnection connection = connectionManager.load(outputConnectionName);
- IOutputConnector connector = outputConnectorPool.grab(connection);
+ IOutputConnector connector = outputConnectorPool.grab(outputConnection);
if (connector == null)
// The connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("Output connector not installed",0L);
@@ -484,22 +480,21 @@ public class IncrementalIngester extends
}
finally
{
- outputConnectorPool.release(connection,connector);
+ outputConnectorPool.release(outputConnection,connector);
}
}
/** Get transformation version string for a document.
- *@param transformationConnectionName is the names of the transformation connection associated with this action.
+ *@param transformationConnection is the transformation connection associated with this action.
*@param spec is the transformation specification.
*@return the description string.
*/
@Override
- public VersionContext getTransformationDescription(String transformationConnectionName, Specification spec)
+ public VersionContext getTransformationDescription(ITransformationConnection transformationConnection, Specification spec)
throws ManifoldCFException, ServiceInterruption
{
- ITransformationConnection connection = transformationConnectionManager.load(transformationConnectionName);
- ITransformationConnector connector = transformationConnectorPool.grab(connection);
+ ITransformationConnector connector = transformationConnectorPool.grab(transformationConnection);
if (connector == null)
// The connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("Transformation connector not installed",0L);
@@ -509,7 +504,7 @@ public class IncrementalIngester extends
}
finally
{
- transformationConnectorPool.release(connection,connector);
+ transformationConnectorPool.release(transformationConnection,connector);
}
}
@@ -530,12 +525,10 @@ public class IncrementalIngester extends
{
if (newAuthorityNameString == null)
newAuthorityNameString = "";
- IPipelineSpecification pipelineSpecification = pipelineSpecificationWithVersions.getPipelineSpecification();
- IPipelineSpecificationBasic basicSpecification = pipelineSpecification.getBasicPipelineSpecification();
// Cycle through the outputs
- for (int i = 0; i < basicSpecification.getOutputCount(); i++)
+ for (int i = 0; i < pipelineSpecificationWithVersions.getOutputCount(); i++)
{
- int stage = basicSpecification.getOutputStage(i);
+ int stage = pipelineSpecificationWithVersions.getOutputStage(i);
String oldDocumentVersion = pipelineSpecificationWithVersions.getOutputDocumentVersionString(i);
String oldOutputVersion = pipelineSpecificationWithVersions.getOutputVersionString(i);
String oldAuthorityName = pipelineSpecificationWithVersions.getAuthorityNameString(i);
@@ -545,11 +538,11 @@ public class IncrementalIngester extends
// Look first at the version strings that aren't pipeline dependent
if (!oldDocumentVersion.equals(newDocumentVersion) ||
!oldAuthorityName.equals(newAuthorityNameString) ||
- !oldOutputVersion.equals(pipelineSpecification.getStageDescriptionString(stage).getVersionString()))
+ !oldOutputVersion.equals(pipelineSpecificationWithVersions.getStageDescriptionString(stage).getVersionString()))
return true;
// Everything matches so far. Next step is to compute a transformation path an corresponding version string.
- String newTransformationVersion = computePackedTransformationVersion(pipelineSpecification,stage);
+ String newTransformationVersion = computePackedTransformationVersion(pipelineSpecificationWithVersions,stage);
if (!pipelineSpecificationWithVersions.getOutputTransformationVersionString(i).equals(newTransformationVersion))
return true;
}
@@ -564,13 +557,12 @@ public class IncrementalIngester extends
*/
protected static String computePackedTransformationVersion(IPipelineSpecification pipelineSpecification, int stage)
{
- IPipelineSpecificationBasic basicSpecification = pipelineSpecification.getBasicPipelineSpecification();
// First, count the stages we need to represent
int stageCount = 0;
int currentStage = stage;
while (true)
{
- int newStage = basicSpecification.getStageParent(currentStage);
+ int newStage = pipelineSpecification.getStageParent(currentStage);
if (newStage == -1)
break;
stageCount++;
@@ -583,10 +575,10 @@ public class IncrementalIngester extends
currentStage = stage;
while (true)
{
- int newStage = basicSpecification.getStageParent(currentStage);
+ int newStage = pipelineSpecification.getStageParent(currentStage);
if (newStage == -1)
break;
- stageNames[stageCount] = basicSpecification.getStageConnectionName(newStage);
+ stageNames[stageCount] = pipelineSpecification.getStageConnectionName(newStage);
stageDescriptions[stageCount] = pipelineSpecification.getStageDescriptionString(newStage).getVersionString();
stageCount++;
currentStage = newStage;
@@ -674,7 +666,7 @@ public class IncrementalIngester extends
* This method is conceptually similar to documentIngest(), but does not actually take
* a document or allow it to be transformed. If there is a document already
* indexed, it is removed from the index.
- *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
+ *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param componentHash is the hashed component identifier, if any.
@@ -685,7 +677,7 @@ public class IncrementalIngester extends
*/
@Override
public void documentNoData(
- IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
+ IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
String identifierClass, String identifierHash, String componentHash,
String documentVersion,
String authorityName,
@@ -697,11 +689,11 @@ public class IncrementalIngester extends
if (Logging.ingest.isDebugEnabled())
{
- Logging.ingest.debug("Logging empty document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineConnectionsWithVersions.getSpecification().getBasicPipelineSpecification())+"'");
+ Logging.ingest.debug("Logging empty document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions)+"'");
}
// Set up a pipeline
- PipelineObjectWithVersions pipeline = pipelineGrabWithVersions(pipelineConnectionsWithVersions);
+ PipelineObjectWithVersions pipeline = pipelineGrabWithVersions(pipelineSpecificationWithVersions);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("Pipeline connector not installed",0L);
@@ -720,7 +712,7 @@ public class IncrementalIngester extends
* method also REMOVES ALL OLD METADATA. When complete, the index will contain only the metadata
* described by the RepositoryDocument object passed to this method.
* ServiceInterruption is thrown if the document ingestion must be rescheduled.
- *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
+ *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param componentHash is the hashed component identifier, if any.
@@ -735,7 +727,7 @@ public class IncrementalIngester extends
*/
@Override
public boolean documentIngest(
- IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
+ IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
String identifierClass, String identifierHash, String componentHash,
String documentVersion,
String authorityName,
@@ -748,14 +740,14 @@ public class IncrementalIngester extends
if (Logging.ingest.isDebugEnabled())
{
- Logging.ingest.debug("Ingesting document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineConnectionsWithVersions.getSpecification().getBasicPipelineSpecification())+"'");
+ Logging.ingest.debug("Ingesting document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions)+"'");
}
// Set indexing date
data.setIndexingDate(new Date());
// Set up a pipeline
- PipelineObjectWithVersions pipeline = pipelineGrabWithVersions(pipelineConnectionsWithVersions);
+ PipelineObjectWithVersions pipeline = pipelineGrabWithVersions(pipelineSpecificationWithVersions);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("Pipeline connector not installed",0L);
@@ -770,7 +762,7 @@ public class IncrementalIngester extends
}
/** Remove a document component from the search engine index.
- *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param pipelineConnections is the pipeline specification.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hash of the id of the document.
*@param componentHash is the hashed component identifier, if any.
@@ -778,12 +770,12 @@ public class IncrementalIngester extends
*/
@Override
public void documentRemove(
- IPipelineSpecificationBasic pipelineSpecificationBasic,
+ IPipelineConnections pipelineConnections,
String identifierClass, String identifierHash, String componentHash,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
- documentRemoveMultiple(pipelineSpecificationBasic,
+ documentRemoveMultiple(pipelineConnections,
new String[]{identifierClass},
new String[]{identifierHash},
componentHash,
@@ -926,24 +918,24 @@ public class IncrementalIngester extends
/** Delete multiple documents from the search engine index.
- *@param pipelineSpecificationBasics are the pipeline specifications associated with the documents.
+ *@param pipelineConnections are the pipeline specifications associated with the documents.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is tha array of document identifier hashes if the documents.
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
@Override
public void documentDeleteMultiple(
- IPipelineSpecificationBasic[] pipelineSpecificationBasics,
+ IPipelineConnections[] pipelineConnections,
String[] identifierClasses, String[] identifierHashes,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
// Segregate request by pipeline spec instance address. Not perfect but works in the
// environment it is used it.
- Map<IPipelineSpecificationBasic,List<Integer>> keyMap = new HashMap<IPipelineSpecificationBasic,List<Integer>>();
- for (int i = 0; i < pipelineSpecificationBasics.length; i++)
+ Map<IPipelineConnections,List<Integer>> keyMap = new HashMap<IPipelineConnections,List<Integer>>();
+ for (int i = 0; i < pipelineConnections.length; i++)
{
- IPipelineSpecificationBasic spec = pipelineSpecificationBasics[i];
+ IPipelineConnections spec = pipelineConnections[i];
List<Integer> list = keyMap.get(spec);
if (list == null)
{
@@ -954,10 +946,10 @@ public class IncrementalIngester extends
}
// Create the return array.
- Iterator<IPipelineSpecificationBasic> iter = keyMap.keySet().iterator();
+ Iterator<IPipelineConnections> iter = keyMap.keySet().iterator();
while (iter.hasNext())
{
- IPipelineSpecificationBasic spec = iter.next();
+ IPipelineConnections spec = iter.next();
List<Integer> list = keyMap.get(spec);
String[] localIdentifierClasses = new String[list.size()];
String[] localIdentifierHashes = new String[list.size()];
@@ -972,21 +964,20 @@ public class IncrementalIngester extends
}
/** Delete multiple documents from the search engine index.
- *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param pipelineConnections is the pipeline specification.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is tha array of document identifier hashes if the documents.
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
@Override
public void documentDeleteMultiple(
- IPipelineSpecificationBasic pipelineSpecificationBasic,
+ IPipelineConnections pipelineConnections,
String[] identifierClasses, String[] identifierHashes,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
- String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
- // Load connection managers up front to save time
- IOutputConnection[] outputConnections = connectionManager.loadMultiple(outputConnectionNames);
+ String[] outputConnectionNames = pipelineConnections.getOutputConnectionNames();
+ IOutputConnection[] outputConnections = pipelineConnections.getOutputConnections();
// No transactions here, so we can cycle through the connection names one at a time
for (int z = 0; z < outputConnectionNames.length; z++)
@@ -1187,7 +1178,7 @@ public class IncrementalIngester extends
}
/** Remove multiple document components from the search engine index.
- *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param pipelineConnections is the pipeline specification.
*@param identifierClasses are the names of the spaces in which the identifier hash should be interpreted.
*@param identifierHashes are the hashes of the ids of the documents.
*@param componentHash is the hashed component identifier, if any.
@@ -1195,14 +1186,13 @@ public class IncrementalIngester extends
*/
@Override
public void documentRemoveMultiple(
- IPipelineSpecificationBasic pipelineSpecificationBasic,
+ IPipelineConnections pipelineConnections,
String[] identifierClasses, String[] identifierHashes, String componentHash,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
- String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
- // Load connection managers up front to save time
- IOutputConnection[] outputConnections = connectionManager.loadMultiple(outputConnectionNames);
+ String[] outputConnectionNames = pipelineConnections.getOutputConnectionNames();
+ IOutputConnection[] outputConnections = pipelineConnections.getOutputConnections();
// No transactions here, so we can cycle through the connection names one at a time
for (int z = 0; z < outputConnectionNames.length; z++)
@@ -1550,19 +1540,19 @@ public class IncrementalIngester extends
}
/** Delete a document from the search engine index.
- *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param pipelineConnections is the pipeline specification.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hash of the id of the document.
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
@Override
public void documentDelete(
- IPipelineSpecificationBasic pipelineSpecificationBasic,
+ IPipelineConnections pipelineConnections,
String identifierClass, String identifierHash,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
- documentDeleteMultiple(pipelineSpecificationBasic,new String[]{identifierClass},new String[]{identifierHash},activities);
+ documentDeleteMultiple(pipelineConnections,new String[]{identifierClass},new String[]{identifierHash},activities);
}
/** Find out what URIs a SET of document URIs are currently ingested.
@@ -1960,40 +1950,44 @@ public class IncrementalIngester extends
/** Reset all documents belonging to a specific output connection, because we've got information that
* that system has been reconfigured. This will force all such documents to be reindexed the next time
* they are checked.
- *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param outputConnection is the output connection associated with this action.
*/
@Override
- public void resetOutputConnection(String outputConnectionName)
+ public void resetOutputConnection(IOutputConnection outputConnection)
throws ManifoldCFException
{
+ if (outputConnection == null)
+ return;
+
// We're not going to blow away the records, but we are going to set their versions to mean, "reindex required"
HashMap map = new HashMap();
map.put(lastVersionField,null);
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(outputConnNameField,outputConnectionName)});
+ new UnitaryClause(outputConnNameField,outputConnection.getName())});
performUpdate(map,"WHERE "+query,list,null);
}
/** Remove all knowledge of an output index from the system. This is appropriate
* when the output index no longer exists and you wish to delete the associated job.
- *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param outputConnection is the output connection associated with this action.
*/
@Override
- public void removeOutputConnection(String outputConnectionName)
+ public void removeOutputConnection(IOutputConnection outputConnection)
throws ManifoldCFException
{
- IOutputConnection connection = connectionManager.load(outputConnectionName);
-
+ if (outputConnection == null)
+ return;
+
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(outputConnNameField,outputConnectionName)});
+ new UnitaryClause(outputConnNameField,outputConnection.getName())});
performDelete("WHERE "+query,list,null);
// Notify the output connection of the removal of all the records for the connection
- IOutputConnector connector = outputConnectorPool.grab(connection);
+ IOutputConnector connector = outputConnectorPool.grab(outputConnection);
if (connector == null)
return;
try
@@ -2002,7 +1996,7 @@ public class IncrementalIngester extends
}
finally
{
- outputConnectorPool.release(connection,connector);
+ outputConnectorPool.release(outputConnection,connector);
}
}
@@ -2556,12 +2550,12 @@ public class IncrementalIngester extends
protected class PipelineObject
{
- public final IPipelineConnections pipelineConnections;
+ public final IPipelineSpecification pipelineConnections;
public final IOutputConnector[] outputConnectors;
public final ITransformationConnector[] transformationConnectors;
public PipelineObject(
- IPipelineConnections pipelineConnections,
+ IPipelineSpecification pipelineConnections,
ITransformationConnector[] transformationConnectors,
IOutputConnector[] outputConnectors)
{
@@ -2624,15 +2618,13 @@ public class IncrementalIngester extends
// Create the current set
Map<Integer,PipelineCheckEntryPoint> currentSet = new HashMap<Integer,PipelineCheckEntryPoint>();
// First, locate all the output stages, and enter them into the set
- IPipelineSpecification spec = pipelineConnections.getSpecification();
- IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
- int count = basicSpec.getOutputCount();
+ int count = pipelineConnections.getOutputCount();
for (int i = 0; i < count; i++)
{
- int outputStage = basicSpec.getOutputStage(i);
+ int outputStage = pipelineConnections.getOutputStage(i);
PipelineCheckEntryPoint outputStageEntryPoint = new PipelineCheckEntryPoint(
outputConnectors[pipelineConnections.getOutputConnectionIndex(outputStage).intValue()],
- spec.getStageDescriptionString(outputStage),finalActivity);
+ pipelineConnections.getStageDescriptionString(outputStage),finalActivity);
currentSet.put(new Integer(outputStage), outputStageEntryPoint);
}
// Cycle through the "current set"
@@ -2642,9 +2634,9 @@ public class IncrementalIngester extends
int[] siblings = null;
for (Integer outputStage : currentSet.keySet())
{
- parent = basicSpec.getStageParent(outputStage.intValue());
+ parent = pipelineConnections.getStageParent(outputStage.intValue());
// Look up the children
- siblings = basicSpec.getStageChildren(parent);
+ siblings = pipelineConnections.getStageChildren(parent);
// Are all the siblings in the current set yet? If not, we can't proceed with this entry.
boolean skipToNext = false;
for (int sibling : siblings)
@@ -2679,7 +2671,7 @@ public class IncrementalIngester extends
return pcf;
PipelineCheckEntryPoint newEntry = new PipelineCheckEntryPoint(
transformationConnectors[pipelineConnections.getTransformationConnectionIndex(parent).intValue()],
- spec.getStageDescriptionString(parent),pcf);
+ pipelineConnections.getStageDescriptionString(parent),pcf);
currentSet.put(new Integer(parent), newEntry);
}
}
@@ -2687,15 +2679,15 @@ public class IncrementalIngester extends
protected class PipelineObjectWithVersions extends PipelineObject
{
- protected final IPipelineConnectionsWithVersions pipelineConnectionsWithVersions;
+ protected final IPipelineSpecificationWithVersions pipelineSpecificationWithVersions;
public PipelineObjectWithVersions(
- IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
+ IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
ITransformationConnector[] transformationConnectors,
IOutputConnector[] outputConnectors)
{
- super(pipelineConnectionsWithVersions,transformationConnectors,outputConnectors);
- this.pipelineConnectionsWithVersions = pipelineConnectionsWithVersions;
+ super(pipelineSpecificationWithVersions,transformationConnectors,outputConnectors);
+ this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
}
public int addOrReplaceDocumentWithException(String docKey, String componentHash, String documentURI, RepositoryDocument document, String newDocumentVersion, String authorityNameString, IOutputActivity finalActivity, long ingestTime)
@@ -2726,14 +2718,12 @@ public class IncrementalIngester extends
// Create the current set
Map<Integer,PipelineAddEntryPoint> currentSet = new HashMap<Integer,PipelineAddEntryPoint>();
// First, locate all the output stages, and enter them into the set
- IPipelineSpecificationWithVersions fullSpec = pipelineConnectionsWithVersions.getSpecificationWithVersions();
- IPipelineSpecification pipelineSpec = fullSpec.getPipelineSpecification();
- IPipelineSpecificationBasic basicSpec = pipelineSpec.getBasicPipelineSpecification();
+ IPipelineSpecificationWithVersions fullSpec = pipelineSpecificationWithVersions;
- int outputCount = basicSpec.getOutputCount();
+ int outputCount = fullSpec.getOutputCount();
for (int i = 0; i < outputCount; i++)
{
- int outputStage = basicSpec.getOutputStage(i);
+ int outputStage = fullSpec.getOutputStage(i);
// Compute whether we need to reindex this record to this output or not, based on spec.
String oldDocumentVersion = fullSpec.getOutputDocumentVersionString(i);
@@ -2742,13 +2732,13 @@ public class IncrementalIngester extends
String oldAuthorityName = fullSpec.getAuthorityNameString(i);
// Compute the transformation version string. Must always be computed if we're going to reindex, since we save it.
- String newTransformationVersion = computePackedTransformationVersion(pipelineSpec,outputStage);
+ String newTransformationVersion = computePackedTransformationVersion(fullSpec,outputStage);
boolean needToReindex = (oldDocumentVersion == null);
if (needToReindex == false)
{
needToReindex = (!oldDocumentVersion.equals(newDocumentVersion) ||
- !oldOutputVersion.equals(pipelineSpec.getStageDescriptionString(outputStage)) ||
+ !oldOutputVersion.equals(fullSpec.getStageDescriptionString(outputStage)) ||
!oldAuthorityName.equals(newAuthorityNameString));
}
if (needToReindex == false)
@@ -2756,13 +2746,13 @@ public class IncrementalIngester extends
needToReindex = (!oldTransformationVersion.equals(newTransformationVersion));
}
- int connectionIndex = pipelineConnectionsWithVersions.getOutputConnectionIndex(outputStage).intValue();
+ int connectionIndex = fullSpec.getOutputConnectionIndex(outputStage).intValue();
PipelineAddEntryPoint outputStageEntryPoint = new OutputAddEntryPoint(
outputConnectors[connectionIndex],
- pipelineSpec.getStageDescriptionString(outputStage),
- new OutputActivitiesWrapper(finalActivity,basicSpec.getStageConnectionName(outputStage)),
+ fullSpec.getStageDescriptionString(outputStage),
+ new OutputActivitiesWrapper(finalActivity,fullSpec.getStageConnectionName(outputStage)),
needToReindex,
- basicSpec.getStageConnectionName(outputStage),
+ fullSpec.getStageConnectionName(outputStage),
newTransformationVersion,
ingestTime,
newDocumentVersion,
@@ -2778,9 +2768,9 @@ public class IncrementalIngester extends
int[] siblings = null;
for (Integer outputStage : currentSet.keySet())
{
- parent = basicSpec.getStageParent(outputStage.intValue());
+ parent = fullSpec.getStageParent(outputStage.intValue());
// Look up the children
- siblings = basicSpec.getStageChildren(parent);
+ siblings = fullSpec.getStageChildren(parent);
// Are all the siblings in the current set yet? If not, we can't proceed with this entry.
boolean skipToNext = false;
for (int sibling : siblings)
@@ -2812,13 +2802,13 @@ public class IncrementalIngester extends
// Wrap the entry points in a fan-out class, which has pipe connector-like methods that fire across all the connectors.
PipelineAddFanout pcf = new PipelineAddFanout(siblingEntryPoints,
(parent==-1)?null:new TransformationRecordingActivity(finalActivity,
- basicSpec.getStageConnectionName(parent)),
+ fullSpec.getStageConnectionName(parent)),
finalActivity);
if (parent == -1)
return pcf;
PipelineAddEntryPoint newEntry = new PipelineAddEntryPoint(
- transformationConnectors[pipelineConnections.getTransformationConnectionIndex(parent).intValue()],
- pipelineSpec.getStageDescriptionString(parent),newAuthorityNameString,pcf,pcf.checkNeedToReindex());
+ transformationConnectors[fullSpec.getTransformationConnectionIndex(parent).intValue()],
+ fullSpec.getStageDescriptionString(parent),newAuthorityNameString,pcf,pcf.checkNeedToReindex());
currentSet.put(new Integer(parent), newEntry);
}
@@ -3433,202 +3423,6 @@ public class IncrementalIngester extends
return lockArray;
}
- /** Basic pipeline specification for backwards-compatible methods */
- protected static class RuntPipelineSpecificationBasic implements IPipelineSpecificationBasic
- {
- protected final String outputConnectionName;
-
- public RuntPipelineSpecificationBasic(String outputConnectionName)
- {
- this.outputConnectionName = outputConnectionName;
- }
-
- /** Get a count of all stages.
- *@return the total count of all stages.
- */
- @Override
- public int getStageCount()
- {
- return 1;
- }
-
- /** Find children of a given pipeline stage. Pass -1 to find the children of the root stage.
- *@param stage is the stage index to get the children of.
- *@return the pipeline stages that represent those children.
- */
- @Override
- public int[] getStageChildren(int stage)
- {
- if (stage == -1)
- return new int[]{0};
- return new int[0];
- }
-
- /** Find parent of a given pipeline stage. Returns -1 if there's no parent (it's the root).
- *@param stage is the stage index to get the parent of.
- *@return the pipeline stage that is the parent, or -1.
- */
- @Override
- public int getStageParent(int stage)
- {
- return -1;
- }
-
- /** Get the connection name for a pipeline stage.
- *@param stage is the stage to get the connection name for.
- *@return the connection name for that stage.
- */
- @Override
- public String getStageConnectionName(int stage)
- {
- if (stage == 0)
- return outputConnectionName;
- return null;
- }
-
- /** Check if a stage is an output stage.
- *@param stage is the stage to check.
- *@return true if the stage represents an output connection.
- */
- @Override
- public boolean checkStageOutputConnection(int stage)
- {
- return true;
- }
-
- /** Return the number of output connections.
- *@return the total number of output connections in this specification.
- */
- @Override
- public int getOutputCount()
- {
- return 1;
- }
-
- /** Given an output index, return the stage number for that output.
- *@param index is the output connection index.
- *@return the stage number.
- */
- @Override
- public int getOutputStage(int index)
- {
- return 0;
- }
-
- }
-
- /** Pipeline specification for backwards-compatible methods without pipelines */
- protected static class RuntPipelineSpecification extends RuntPipelineSpecificationBasic implements IPipelineSpecification
- {
- protected final VersionContext outputDescriptionString;
-
- public RuntPipelineSpecification(String outputConnectionName, VersionContext outputDescriptionString)
- {
- super(outputConnectionName);
- this.outputDescriptionString = outputDescriptionString;
- }
-
- /** Get the basic pipeline specification.
- *@return the specification.
- */
- @Override
- public IPipelineSpecificationBasic getBasicPipelineSpecification()
- {
- return this;
- }
-
- /** Get the description string for a pipeline stage.
- *@param stage is the stage to get the connection name for.
- *@return the description string that stage.
- */
- @Override
- public VersionContext getStageDescriptionString(int stage)
- {
- if (stage == 0)
- return outputDescriptionString;
- return null;
- }
-
- }
-
- /** Pipeline specification for backwards-compatible methods without pipelines */
- protected static class RuntPipelineSpecificationWithVersions extends RuntPipelineSpecification implements IPipelineSpecificationWithVersions
- {
- protected final String oldDocumentVersion;
- protected final String oldOutputVersion;
- protected final String oldTransformationVersion;
- protected final String oldAuthorityNameString;
-
- public RuntPipelineSpecificationWithVersions(String outputConnectionName, VersionContext outputDescriptionString,
- String oldDocumentVersion, String oldOutputVersion, String oldTransformationVersion,
- String oldAuthorityNameString)
- {
- super(outputConnectionName,outputDescriptionString);
- this.oldDocumentVersion = oldDocumentVersion;
- this.oldOutputVersion = oldOutputVersion;
- this.oldTransformationVersion = oldTransformationVersion;
- this.oldAuthorityNameString = oldAuthorityNameString;
- }
-
- /** Get pipeline specification.
- *@return the pipeline specification.
- */
- @Override
- public IPipelineSpecification getPipelineSpecification()
- {
- return this;
- }
-
- /** For a given output index, return a document version string.
- *@param index is the output index.
- *@return the document version string.
- */
- @Override
- public String getOutputDocumentVersionString(int index)
- {
- if (index == 0)
- return oldDocumentVersion;
- return null;
- }
-
- /** For a given output index, return a transformation version string.
- *@param index is the output index.
- *@return the transformation version string.
- */
- @Override
- public String getOutputTransformationVersionString(int index)
- {
- if (index == 0)
- return oldTransformationVersion;
- return null;
- }
-
- /** For a given output index, return an output version string.
- *@param index is the output index.
- *@return the output version string.
- */
- @Override
- public String getOutputVersionString(int index)
- {
- if (index == 0)
- return oldOutputVersion;
- return null;
- }
-
- /** For a given output index, return an authority name string.
- *@param index is the output index.
- *@return the authority name string.
- */
- @Override
- public String getAuthorityNameString(int index)
- {
- if (index == 0)
- return oldAuthorityNameString;
- return null;
- }
-
- }
-
/** This class passes everything through, and monitors what happens so that the
* framework can compensate for any transformation connector coding errors.
*/
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java Wed Dec 10 13:36:42 2014
@@ -75,79 +75,79 @@ public interface IIncrementalIngester
public String getFirstIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic);
/** Get an output version string for a document.
- *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param outputConnection is the output connection associated with this action.
*@param spec is the output specification.
*@return the description string.
*/
- public VersionContext getOutputDescription(String outputConnectionName, Specification spec)
+ public VersionContext getOutputDescription(IOutputConnection outputConnection, Specification spec)
throws ManifoldCFException, ServiceInterruption;
/** Get transformation version string for a document.
- *@param transformationConnectionName is the names of the transformation connection associated with this action.
+ *@param transformationConnection is the transformation connection associated with this action.
*@param spec is the transformation specification.
*@return the description string.
*/
- public VersionContext getTransformationDescription(String transformationConnectionName, Specification spec)
+ public VersionContext getTransformationDescription(ITransformationConnection transformationConnection, Specification spec)
throws ManifoldCFException, ServiceInterruption;
/** Check if a document date is indexable.
- *@param pipelineConnections is the IPipelineConnections object for this pipeline.
+ *@param pipelineSpecification is the IPipelineSpecification object for this pipeline.
*@param date is the date to check
*@param activity are the activities available to this method.
*@return true if the document with that date is indexable.
*/
public boolean checkDateIndexable(
- IPipelineConnections pipelineConnections,
+ IPipelineSpecification pipelineSpecification,
Date date,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption;
/** Check if a mime type is indexable.
- *@param pipelineConnections is the pipeline connections object for this pipeline.
+ *@param pipelineSpecification is the IPipelineSpecification object for this pipeline.
*@param mimeType is the mime type to check.
*@param activity are the activities available to this method.
*@return true if the mimeType is indexable.
*/
public boolean checkMimeTypeIndexable(
- IPipelineConnections pipelineConnections,
+ IPipelineSpecification pipelineSpecification,
String mimeType,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption;
/** Check if a file is indexable.
- *@param pipelineConnections is the pipeline connections object for this pipeline.
+ *@param pipelineSpecification is the IPipelineSpecification object for this pipeline.
*@param localFile is the local file to check.
*@param activity are the activities available to this method.
*@return true if the local file is indexable.
*/
public boolean checkDocumentIndexable(
- IPipelineConnections pipelineConnections,
+ IPipelineSpecification pipelineSpecification,
File localFile,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption;
/** Pre-determine whether a document's length is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that are too long to be indexable.
- *@param pipelineConnections is the pipeline connections object for this pipeline.
+ *@param pipelineSpecification is the IPipelineSpecification object for this pipeline.
*@param length is the length of the document.
*@param activity are the activities available to this method.
*@return true if the file is indexable.
*/
public boolean checkLengthIndexable(
- IPipelineConnections pipelineConnections,
+ IPipelineSpecification pipelineSpecification,
long length,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption;
/** Pre-determine whether a document's URL is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that not indexable.
- *@param pipelineConnections is the pipeline connections object for this pipeline.
+ *@param pipelineSpecification is the IPipelineSpecification object for this pipeline.
*@param url is the url of the document.
*@param activity are the activities available to this method.
*@return true if the file is indexable.
*/
public boolean checkURLIndexable(
- IPipelineConnections pipelineConnections,
+ IPipelineSpecification pipelineSpecification,
String url,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption;
@@ -186,7 +186,7 @@ public interface IIncrementalIngester
* This method is conceptually similar to documentIngest(), but does not actually take
* a document or allow it to be transformed. If there is a document already
* indexed, it is removed from the index.
- *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
+ *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param componentHash is the hashed component identifier, if any.
@@ -196,7 +196,7 @@ public interface IIncrementalIngester
*@param activities is an object providing a set of methods that the implementer can use to perform the operation.
*/
public void documentNoData(
- IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
+ IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
String identifierClass, String identifierHash, String componentHash,
String documentVersion,
String authorityName,
@@ -209,7 +209,7 @@ public interface IIncrementalIngester
* method also REMOVES ALL OLD METADATA. When complete, the index will contain only the metadata
* described by the RepositoryDocument object passed to this method.
* ServiceInterruption is thrown if the document ingestion must be rescheduled.
- *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
+ *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param componentHash is the hashed component identifier, if any.
@@ -223,7 +223,7 @@ public interface IIncrementalIngester
*@throws IOException only if data stream throws an IOException.
*/
public boolean documentIngest(
- IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
+ IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
String identifierClass, String identifierHash, String componentHash,
String documentVersion,
String authorityName,
@@ -233,27 +233,27 @@ public interface IIncrementalIngester
throws ManifoldCFException, ServiceInterruption, IOException;
/** Remove a document component from the search engine index.
- *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param pipelineConnections is the pipeline specification.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hash of the id of the document.
*@param componentHash is the hashed component identifier, if any.
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
public void documentRemove(
- IPipelineSpecificationBasic pipelineSpecificationBasic,
+ IPipelineConnections pipelineConnections,
String identifierClass, String identifierHash, String componentHash,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption;
/** Remove multiple document components from the search engine index.
- *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param pipelineConnections is the pipeline specification.
*@param identifierClasses are the names of the spaces in which the identifier hash should be interpreted.
*@param identifierHashes are the hashes of the ids of the documents.
*@param componentHash is the hashed component identifier, if any.
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
public void documentRemoveMultiple(
- IPipelineSpecificationBasic pipelineSpecificationBasic,
+ IPipelineConnections pipelineConnections,
String[] identifierClasses, String[] identifierHashes, String componentHash,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption;
@@ -285,37 +285,37 @@ public interface IIncrementalIngester
throws ManifoldCFException;
/** Delete multiple documents, and their components, from the search engine index.
- *@param pipelineSpecificationBasics are the pipeline specifications associated with the documents.
+ *@param pipelineConnections are the pipeline specifications associated with the documents.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is tha array of document identifier hashes if the documents.
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
public void documentDeleteMultiple(
- IPipelineSpecificationBasic[] pipelineSpecificationBasics,
+ IPipelineConnections[] pipelineConnections,
String[] identifierClasses, String[] identifierHashes,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption;
/** Delete multiple documents, and their components, from the search engine index.
- *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param pipelineConnections is the pipeline specification.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is tha array of document identifier hashes if the documents.
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
public void documentDeleteMultiple(
- IPipelineSpecificationBasic pipelineSpecificationBasic,
+ IPipelineConnections pipelineConnections,
String[] identifierClasses, String[] identifierHashes,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption;
/** Delete a document, and all its components, from the search engine index.
- *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param pipelineConnections is the pipeline specification.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hash of the id of the document.
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
public void documentDelete(
- IPipelineSpecificationBasic pipelineSpecificationBasic,
+ IPipelineConnections pipelineConnections,
String identifierClass, String identifierHash,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption;
@@ -383,16 +383,16 @@ public interface IIncrementalIngester
/** Reset all documents belonging to a specific output connection, because we've got information that
* that system has been reconfigured. This will force all such documents to be reindexed the next time
* they are checked.
- *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param outputConnection is the output connection associated with this action.
*/
- public void resetOutputConnection(String outputConnectionName)
+ public void resetOutputConnection(IOutputConnection outputConnection)
throws ManifoldCFException;
/** Remove all knowledge of an output index from the system. This is appropriate
* when the output index no longer exists and you wish to delete the associated job.
- *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param outputConnection is the output connection associated with this action.
*/
- public void removeOutputConnection(String outputConnectionName)
+ public void removeOutputConnection(IOutputConnection outputConnection)
throws ManifoldCFException;
}
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnections.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnections.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnections.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnections.java Wed Dec 10 13:36:42 2014
@@ -21,12 +21,8 @@ package org.apache.manifoldcf.agents.int
/** This interface caches IOutputConnection and ITransformationConnection objects
* required by an IPipelineSpecification.
*/
-public interface IPipelineConnections
+public interface IPipelineConnections extends IPipelineSpecificationBasic
{
- /** Get the underlying IPipelineSpecification object.
- */
- public IPipelineSpecification getSpecification();
-
/** Get the transformation connection names mentioned by the IPipelineSpecification
* object. */
public String[] getTransformationConnectionNames();
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java Wed Dec 10 13:36:42 2014
@@ -23,15 +23,10 @@ import org.apache.manifoldcf.core.interf
/** This interface describes a multi-output pipeline, where each stage has an already-computed
* description string.
*/
-public interface IPipelineSpecification
+public interface IPipelineSpecification extends IPipelineConnections
{
public static final String _rcsid = "@(#)$Id$";
- /** Get the basic pipeline specification.
- *@return the specification.
- */
- public IPipelineSpecificationBasic getBasicPipelineSpecification();
-
/** Get the description string for a pipeline stage.
*@param stage is the stage to get the connection name for.
*@return the description string that stage.
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java Wed Dec 10 13:36:42 2014
@@ -23,15 +23,10 @@ import org.apache.manifoldcf.core.interf
/** This interface describes a multi-output pipeline, with existing document version information from
* each output..
*/
-public interface IPipelineSpecificationWithVersions
+public interface IPipelineSpecificationWithVersions extends IPipelineSpecification
{
public static final String _rcsid = "@(#)$Id$";
- /** Get pipeline specification.
- *@return the pipeline specification.
- */
- public IPipelineSpecification getPipelineSpecification();
-
/** For a given output index, return a document version string.
*@param index is the output index.
*@return the document version string.
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java Wed Dec 10 13:36:42 2014
@@ -148,7 +148,8 @@ public class ManifoldCF extends org.apac
{
// Blow away the incremental ingestion table first
IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
- ingester.resetOutputConnection(connectionName);
+ IOutputConnectionManager outputConnectionManager = OutputConnectionManagerFactory.make(threadContext);
+ ingester.resetOutputConnection(outputConnectionManager.load(connectionName));
// Now, signal to all agents that the output connection configuration has changed. Do this second, so that there cannot be documents
// resulting from this signal that find themselves "unchanged".
AgentManagerFactory.noteOutputConnectionChange(threadContext,connectionName);
@@ -165,7 +166,8 @@ public class ManifoldCF extends org.apac
{
// Blow away the incremental ingestion table first
IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
- ingester.removeOutputConnection(connectionName);
+ IOutputConnectionManager outputConnectionManager = OutputConnectionManagerFactory.make(threadContext);
+ ingester.removeOutputConnection(outputConnectionManager.load(connectionName));
// Now, signal to all agents that the output connection configuration has changed. Do this second, so that there cannot be documents
// resulting from this signal that find themselves "unchanged".
AgentManagerFactory.noteOutputConnectionChange(threadContext,connectionName);
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Wed Dec 10 13:36:42 2014
@@ -80,6 +80,9 @@ public class DocumentCleanupThread exten
IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+ ITransformationConnectionManager transformationConnectionManager = TransformationConnectionManagerFactory.make(threadContext);
+ IOutputConnectionManager outputConnectionManager = OutputConnectionManagerFactory.make(threadContext);
+
IReprioritizationTracker rt = ReprioritizationTrackerFactory.make(threadContext);
IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
@@ -110,7 +113,7 @@ public class DocumentCleanupThread exten
IJobDescription job = dds.getJobDescription();
String connectionName = job.getConnectionName();
- IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
+ IPipelineConnections pipelineConnections = new PipelineConnections(new PipelineSpecificationBasic(job),transformationConnectionManager,outputConnectionManager);
try
{
@@ -186,7 +189,7 @@ public class DocumentCleanupThread exten
try
{
- ingester.documentDeleteMultiple(pipelineSpecificationBasic,docClassesToRemove,hashedDocsToRemove,activities);
+ ingester.documentDeleteMultiple(pipelineConnections,docClassesToRemove,hashedDocsToRemove,activities);
// Success! Label all these as needing deletion from queue.
for (int k = 0; k < arrayDocHashes.size(); k++)
{
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java Wed Dec 10 13:36:42 2014
@@ -79,7 +79,9 @@ public class DocumentDeleteThread extend
IJobManager jobManager = JobManagerFactory.make(threadContext);
IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
-
+ ITransformationConnectionManager transformationConnectionManager = TransformationConnectionManagerFactory.make(threadContext);
+ IOutputConnectionManager outputConnectionManager = OutputConnectionManagerFactory.make(threadContext);
+
// Loop
while (true)
{
@@ -100,7 +102,7 @@ public class DocumentDeleteThread extend
IJobDescription job = dds.getJobDescription();
String connectionName = job.getConnectionName();
- IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
+ IPipelineConnections pipelineConnections = new PipelineConnections(new PipelineSpecificationBasic(job),transformationConnectionManager,outputConnectionManager);
try
{
@@ -129,7 +131,7 @@ public class DocumentDeleteThread extend
try
{
- ingester.documentDeleteMultiple(pipelineSpecificationBasic,docClassesToRemove,hashedDocsToRemove,logger);
+ ingester.documentDeleteMultiple(pipelineConnections,docClassesToRemove,hashedDocsToRemove,logger);
for (int j = 0; j < dds.getCount(); j++)
{
deleteFromQueue[j] = true;
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Wed Dec 10 13:36:42 2014
@@ -70,6 +70,8 @@ public class ExpireThread extends Thread
IIncrementalIngester ingester = IncrementalIngesterFactory.make(threadContext);
IJobManager jobManager = JobManagerFactory.make(threadContext);
IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+ ITransformationConnectionManager transformationConnectionManager = TransformationConnectionManagerFactory.make(threadContext);
+ IOutputConnectionManager outputConnectionManager = OutputConnectionManagerFactory.make(threadContext);
IReprioritizationTracker rt = ReprioritizationTrackerFactory.make(threadContext);
IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
@@ -100,7 +102,7 @@ public class ExpireThread extends Thread
IJobDescription job = dds.getJobDescription();
String connectionName = job.getConnectionName();
- IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
+ IPipelineConnections pipelineConnections = new PipelineConnections(new PipelineSpecificationBasic(job),transformationConnectionManager,outputConnectionManager);
try
{
@@ -180,7 +182,7 @@ public class ExpireThread extends Thread
// If we fail, we need to put the documents back on the queue.
try
{
- ingester.documentDeleteMultiple(pipelineSpecificationBasic,docClassesToRemove,hashedDocsToRemove,activities);
+ ingester.documentDeleteMultiple(pipelineConnections,docClassesToRemove,hashedDocsToRemove,activities);
// Success! Label all these as needing deletion from queue.
for (int k = 0; k < arrayDocHashes.size(); k++)
{
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java Wed Dec 10 13:36:42 2014
@@ -26,37 +26,144 @@ import org.apache.manifoldcf.crawler.int
*/
public class PipelineSpecification implements IPipelineSpecification
{
- protected final IPipelineSpecificationBasic basicSpecification;
+ protected final IPipelineConnections connections;
protected final VersionContext[] pipelineDescriptionStrings;
- public PipelineSpecification(IPipelineSpecificationBasic basicSpecification, IJobDescription job, IIncrementalIngester ingester)
+ public PipelineSpecification(IPipelineConnections connections, IJobDescription job, IIncrementalIngester ingester)
throws ManifoldCFException, ServiceInterruption
{
- this.basicSpecification = basicSpecification;
- this.pipelineDescriptionStrings = new VersionContext[basicSpecification.getStageCount()];
+ this.connections = connections;
+ this.pipelineDescriptionStrings = new VersionContext[connections.getStageCount()];
for (int i = 0; i < pipelineDescriptionStrings.length; i++)
{
// Note: this needs to change when output connections become part of the pipeline
VersionContext descriptionString;
- if (basicSpecification.checkStageOutputConnection(i))
+ if (connections.checkStageOutputConnection(i))
{
- descriptionString = ingester.getOutputDescription(basicSpecification.getStageConnectionName(i),job.getPipelineStageSpecification(i));
+ descriptionString = ingester.getOutputDescription(connections.getOutputConnections()[connections.getOutputConnectionIndex(i).intValue()],job.getPipelineStageSpecification(i));
}
else
{
- descriptionString = ingester.getTransformationDescription(basicSpecification.getStageConnectionName(i),job.getPipelineStageSpecification(i));
+ descriptionString = ingester.getTransformationDescription(connections.getTransformationConnections()[connections.getTransformationConnectionIndex(i).intValue()],job.getPipelineStageSpecification(i));
}
this.pipelineDescriptionStrings[i] = descriptionString;
}
}
- /** Get the basic pipeline specification.
- *@return the specification.
+ /** Get a count of all stages.
+ *@return the total count of all stages.
*/
@Override
- public IPipelineSpecificationBasic getBasicPipelineSpecification()
+ public int getStageCount()
{
- return basicSpecification;
+ return connections.getStageCount();
+ }
+
+ /** Find children of a given pipeline stage. Pass -1 to find the children of the root stage.
+ *@param stage is the stage index to get the children of.
+ *@return the pipeline stages that represent those children.
+ */
+ @Override
+ public int[] getStageChildren(int stage)
+ {
+ return connections.getStageChildren(stage);
+ }
+
+ /** Find parent of a given pipeline stage. Returns -1 if there's no parent (it's the root).
+ *@param stage is the stage index to get the parent of.
+ *@return the pipeline stage that is the parent, or -1.
+ */
+ @Override
+ public int getStageParent(int stage)
+ {
+ return connections.getStageParent(stage);
+ }
+
+ /** Get the connection name for a pipeline stage.
+ *@param stage is the stage to get the connection name for.
+ *@return the connection name for that stage.
+ */
+ @Override
+ public String getStageConnectionName(int stage)
+ {
+ return connections.getStageConnectionName(stage);
+ }
+
+ /** Check if a stage is an output stage.
+ *@param stage is the stage to check.
+ *@return true if the stage represents an output connection.
+ */
+ @Override
+ public boolean checkStageOutputConnection(int stage)
+ {
+ return connections.checkStageOutputConnection(stage);
+ }
+
+ /** Return the number of output connections.
+ *@return the total number of output connections in this specification.
+ */
+ @Override
+ public int getOutputCount()
+ {
+ return connections.getOutputCount();
+ }
+
+ /** Given an output index, return the stage number for that output.
+ *@param index is the output connection index.
+ *@return the stage number.
+ */
+ @Override
+ public int getOutputStage(int index)
+ {
+ return connections.getOutputStage(index);
+ }
+
+ /** Get the transformation connection names mentioned by the IPipelineSpecification
+ * object. */
+ @Override
+ public String[] getTransformationConnectionNames()
+ {
+ return connections.getTransformationConnectionNames();
+ }
+
+ /** Get the transformation connection instances mentioned by the IPipelineSpecification
+ * object. */
+ @Override
+ public ITransformationConnection[] getTransformationConnections()
+ {
+ return connections.getTransformationConnections();
+ }
+
+ /** Get the output connection names mentioned by the IPipelineSpecification
+ * object. */
+ @Override
+ public String[] getOutputConnectionNames()
+ {
+ return connections.getOutputConnectionNames();
+ }
+
+ /** Get the output connection instances mentioned by the IPipelineSpecification
+ * object. */
+ @Override
+ public IOutputConnection[] getOutputConnections()
+ {
+ return connections.getOutputConnections();
+ }
+
+ /** Get the index of the transformation connection corresponding to a
+ * specific pipeline stage. */
+ @Override
+ public Integer getTransformationConnectionIndex(int stage)
+ {
+ return connections.getTransformationConnectionIndex(stage);
+ }
+
+ /** Get the index of the output connection corresponding to a
+ * specific pipeline stage. */
+ @Override
+ public Integer getOutputConnectionIndex(int stage)
+ {
+ return connections.getOutputConnectionIndex(stage);
}
/** Get the description string for a pipeline stage.
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java?rev=1644399&r1=1644398&r2=1644399&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java Wed Dec 10 13:36:42 2014
@@ -38,24 +38,140 @@ public class PipelineSpecificationWithVe
this.componentIDHash = componentIDHash;
}
- /** Get pipeline specification.
- *@return the pipeline specification.
+ protected DocumentIngestStatus getStatus(int index)
+ {
+ DocumentIngestStatusSet set = queuedDocument.getLastIngestedStatus(pipelineSpecification.getStageConnectionName(pipelineSpecification.getOutputStage(index)));
+ if (set == null)
+ return null;
+ return set.getComponent(componentIDHash);
+ }
+
+ /** Get a count of all stages.
+ *@return the total count of all stages.
+ */
+ @Override
+ public int getStageCount()
+ {
+ return pipelineSpecification.getStageCount();
+ }
+
+ /** Find children of a given pipeline stage. Pass -1 to find the children of the root stage.
+ *@param stage is the stage index to get the children of.
+ *@return the pipeline stages that represent those children.
+ */
+ @Override
+ public int[] getStageChildren(int stage)
+ {
+ return pipelineSpecification.getStageChildren(stage);
+ }
+
+ /** Find parent of a given pipeline stage. Returns -1 if there's no parent (it's the root).
+ *@param stage is the stage index to get the parent of.
+ *@return the pipeline stage that is the parent, or -1.
*/
@Override
- public IPipelineSpecification getPipelineSpecification()
+ public int getStageParent(int stage)
{
- return pipelineSpecification;
+ return pipelineSpecification.getStageParent(stage);
}
- protected DocumentIngestStatus getStatus(int index)
+ /** Get the connection name for a pipeline stage.
+ *@param stage is the stage to get the connection name for.
+ *@return the connection name for that stage.
+ */
+ @Override
+ public String getStageConnectionName(int stage)
{
- IPipelineSpecificationBasic basic = pipelineSpecification.getBasicPipelineSpecification();
- DocumentIngestStatusSet set = queuedDocument.getLastIngestedStatus(basic.getStageConnectionName(basic.getOutputStage(index)));
- if (set == null)
- return null;
- return set.getComponent(componentIDHash);
+ return pipelineSpecification.getStageConnectionName(stage);
+ }
+
+ /** Check if a stage is an output stage.
+ *@param stage is the stage to check.
+ *@return true if the stage represents an output connection.
+ */
+ @Override
+ public boolean checkStageOutputConnection(int stage)
+ {
+ return pipelineSpecification.checkStageOutputConnection(stage);
+ }
+
+ /** Return the number of output connections.
+ *@return the total number of output connections in this specification.
+ */
+ @Override
+ public int getOutputCount()
+ {
+ return pipelineSpecification.getOutputCount();
}
+ /** Given an output index, return the stage number for that output.
+ *@param index is the output connection index.
+ *@return the stage number.
+ */
+ @Override
+ public int getOutputStage(int index)
+ {
+ return pipelineSpecification.getOutputStage(index);
+ }
+
+ /** Get the transformation connection names mentioned by the IPipelineSpecification
+ * object. */
+ @Override
+ public String[] getTransformationConnectionNames()
+ {
+ return pipelineSpecification.getTransformationConnectionNames();
+ }
+
+ /** Get the transformation connection instances mentioned by the IPipelineSpecification
+ * object. */
+ @Override
+ public ITransformationConnection[] getTransformationConnections()
+ {
+ return pipelineSpecification.getTransformationConnections();
+ }
+
+ /** Get the output connection names mentioned by the IPipelineSpecification
+ * object. */
+ @Override
+ public String[] getOutputConnectionNames()
+ {
+ return pipelineSpecification.getOutputConnectionNames();
+ }
+
+ /** Get the output connection instances mentioned by the IPipelineSpecification
+ * object. */
+ @Override
+ public IOutputConnection[] getOutputConnections()
+ {
+ return pipelineSpecification.getOutputConnections();
+ }
+
+ /** Get the index of the transformation connection corresponding to a
+ * specific pipeline stage. */
+ @Override
+ public Integer getTransformationConnectionIndex(int stage)
+ {
+ return pipelineSpecification.getTransformationConnectionIndex(stage);
+ }
+
+ /** Get the index of the output connection corresponding to a
+ * specific pipeline stage. */
+ @Override
+ public Integer getOutputConnectionIndex(int stage)
+ {
+ return pipelineSpecification.getOutputConnectionIndex(stage);
+ }
+
+ /** Get the description string for a pipeline stage.
+ *@param stage is the stage to get the connection name for.
+ *@return the description string that stage.
+ */
+ @Override
+ public VersionContext getStageDescriptionString(int stage)
+ {
+ return pipelineSpecification.getStageDescriptionString(stage);
+ }
+
/** For a given output index, return a document version string.
*@param index is the output index.
*@return the document version string.