You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/15 13:49:19 UTC
svn commit: r1602680 [2/4] - in /manifoldcf/trunk: ./
framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/
framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/
framework/pull-agent/src/main/java/org/apache/m...
Modified: manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1602680&r1=1602679&r2=1602680&view=diff
==============================================================================
--- manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Sun Jun 15 11:49:19 2014
@@ -216,6 +216,21 @@ public class IncrementalIngester extends
performDelete("",null,null);
}
+ /** From a pipeline specification, get the name of the output connection that will be indexed last
+ * in the pipeline.
+ *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@return the last indexed output connection name.
+ */
+ @Override
+ public String getLastIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic)
+ {
+ // It's always the last in the sequence.
+ int count = pipelineSpecificationBasic.getOutputCount();
+ if (count == 0)
+ return null;
+ return pipelineSpecificationBasic.getStageConnectionName(count-1);
+ }
+
/** Check if a mime type is indexable.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param outputDescription is the output description string.
@@ -223,35 +238,31 @@ public class IncrementalIngester extends
*@return true if the mimeType is indexable.
*/
@Override
+ @Deprecated
public boolean checkMimeTypeIndexable(String outputConnectionName, String outputDescription, String mimeType)
throws ManifoldCFException, ServiceInterruption
{
- return checkMimeTypeIndexable(new String[0], new String[0],
- outputConnectionName, outputDescription,
+ return checkMimeTypeIndexable(
+ new RuntPipelineSpecification(outputConnectionName,outputDescription),
mimeType,null);
}
+
/** Check if a mime type is indexable.
- *@param transformationConnectionNames is the ordered list of transformation connection names.
- *@param transformationDescriptions is the ordered list of transformation description strings.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param outputDescription is the output description string.
+ *@param pipelineSpecification is the pipeline specification.
*@param mimeType is the mime type to check.
*@param activity are the activities available to this method.
*@return true if the mimeType is indexable.
*/
@Override
public boolean checkMimeTypeIndexable(
- String[] transformationConnectionNames, String[] transformationDescriptions,
- String outputConnectionName, String outputDescription,
+ IPipelineSpecification pipelineSpecification,
String mimeType,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
PipelineObject pipeline = pipelineGrab(
- transformationConnectionManager.loadMultiple(transformationConnectionNames),
- connectionManager.load(outputConnectionName),
- transformationDescriptions,outputDescription);
+ new PipelineConnections(pipelineSpecification));
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -265,8 +276,6 @@ public class IncrementalIngester extends
}
}
-
-
/** Check if a file is indexable.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param outputDescription is the output description string.
@@ -274,35 +283,30 @@ public class IncrementalIngester extends
*@return true if the local file is indexable.
*/
@Override
+ @Deprecated
public boolean checkDocumentIndexable(String outputConnectionName, String outputDescription, File localFile)
throws ManifoldCFException, ServiceInterruption
{
- return checkDocumentIndexable(new String[0], new String[0],
- outputConnectionName, outputDescription,
+ return checkDocumentIndexable(
+ new RuntPipelineSpecification(outputConnectionName,outputDescription),
localFile,null);
}
/** Check if a file is indexable.
- *@param transformationConnectionNames is the ordered list of transformation connection names.
- *@param transformationDescriptions is the ordered list of transformation description strings.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param outputDescription is the output description string.
+ *@param pipelineSpecification is the pipeline specification.
*@param localFile is the local file to check.
*@param activity are the activities available to this method.
*@return true if the local file is indexable.
*/
@Override
public boolean checkDocumentIndexable(
- String[] transformationConnectionNames, String[] transformationDescriptions,
- String outputConnectionName, String outputDescription,
+ IPipelineSpecification pipelineSpecification,
File localFile,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
PipelineObject pipeline = pipelineGrab(
- transformationConnectionManager.loadMultiple(transformationConnectionNames),
- connectionManager.load(outputConnectionName),
- transformationDescriptions,outputDescription);
+ new PipelineConnections(pipelineSpecification));
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -324,36 +328,31 @@ public class IncrementalIngester extends
*@return true if the file is indexable.
*/
@Override
+ @Deprecated
public boolean checkLengthIndexable(String outputConnectionName, String outputDescription, long length)
throws ManifoldCFException, ServiceInterruption
{
- return checkLengthIndexable(new String[0], new String[0],
- outputConnectionName, outputDescription,
+ return checkLengthIndexable(
+ new RuntPipelineSpecification(outputConnectionName,outputDescription),
length,null);
}
/** Pre-determine whether a document's length is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that are too long to be indexable.
- *@param transformationConnectionNames is the ordered list of transformation connection names.
- *@param transformationDescriptions is the ordered list of transformation description strings.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param outputDescription is the output description string.
+ *@param pipelineSpecification is the pipeline specification.
*@param length is the length of the document.
*@param activity are the activities available to this method.
*@return true if the file is indexable.
*/
@Override
public boolean checkLengthIndexable(
- String[] transformationConnectionNames, String[] transformationDescriptions,
- String outputConnectionName, String outputDescription,
+ IPipelineSpecification pipelineSpecification,
long length,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
PipelineObject pipeline = pipelineGrab(
- transformationConnectionManager.loadMultiple(transformationConnectionNames),
- connectionManager.load(outputConnectionName),
- transformationDescriptions,outputDescription);
+ new PipelineConnections(pipelineSpecification));
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -375,36 +374,31 @@ public class IncrementalIngester extends
*@return true if the file is indexable.
*/
@Override
+ @Deprecated
public boolean checkURLIndexable(String outputConnectionName, String outputDescription, String url)
throws ManifoldCFException, ServiceInterruption
{
- return checkURLIndexable(new String[0], new String[0],
- outputConnectionName, outputDescription,
+ return checkURLIndexable(
+ new RuntPipelineSpecification(outputConnectionName,outputDescription),
url,null);
}
/** Pre-determine whether a document's URL is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that not indexable.
- *@param transformationConnectionNames is the ordered list of transformation connection names.
- *@param transformationDescriptions is the ordered list of transformation description strings.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param outputDescription is the output description string.
+ *@param pipelineSpecification is the pipeline specification.
*@param url is the url of the document.
*@param activity are the activities available to this method.
*@return true if the file is indexable.
*/
@Override
public boolean checkURLIndexable(
- String[] transformationConnectionNames, String[] transformationDescriptions,
- String outputConnectionName, String outputDescription,
+ IPipelineSpecification pipelineSpecification,
String url,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
PipelineObject pipeline = pipelineGrab(
- transformationConnectionManager.loadMultiple(transformationConnectionNames),
- connectionManager.load(outputConnectionName),
- transformationDescriptions,outputDescription);
+ new PipelineConnections(pipelineSpecification));
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -418,7 +412,6 @@ public class IncrementalIngester extends
}
}
-
/** Grab the entire pipeline.
*@param transformationConnections - the transformation connections, in order
*@param outputConnection - the output connection
@@ -426,42 +419,88 @@ public class IncrementalIngester extends
*@param outputDescriptionString - the output description string
*@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
*/
- protected PipelineObject pipelineGrab(ITransformationConnection[] transformationConnections, IOutputConnection outputConnection,
- String[] transformationDescriptionStrings, String outputDescriptionString)
+ protected PipelineObjectWithVersions pipelineGrabWithVersions(PipelineConnectionsWithVersions pipelineConnections)
throws ManifoldCFException
{
// Pick up all needed transformation connectors
- String[] transformationConnectionNames = new String[transformationConnections.length];
- for (int i = 0; i < transformationConnections.length; i++)
+ ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(pipelineConnections.getTransformationConnectionNames(),pipelineConnections.getTransformationConnections());
+ for (ITransformationConnector c : transformationConnectors)
{
- transformationConnectionNames[i] = transformationConnections[i].getName();
+ if (c == null)
+ {
+ transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+ return null;
+ }
}
- ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(transformationConnectionNames,transformationConnections);
+ // Pick up all needed output connectors. If this fails we have to release the transformation connectors.
+ try
+ {
+ IOutputConnector[] outputConnectors = outputConnectorPool.grabMultiple(pipelineConnections.getOutputConnectionNames(),pipelineConnections.getOutputConnections());
+ for (IOutputConnector c : outputConnectors)
+ {
+ if (c == null)
+ {
+ outputConnectorPool.releaseMultiple(pipelineConnections.getOutputConnections(),outputConnectors);
+ transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+ return null;
+ }
+ }
+ return new PipelineObjectWithVersions(pipelineConnections,transformationConnectors,outputConnectors);
+ }
+ catch (Throwable e)
+ {
+ transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+ if (e instanceof ManifoldCFException)
+ throw (ManifoldCFException)e;
+ else if (e instanceof RuntimeException)
+ throw (RuntimeException)e;
+ else if (e instanceof Error)
+ throw (Error)e;
+ else
+ throw new RuntimeException("Unexpected exception type: "+e.getClass().getName()+": "+e.getMessage(),e);
+ }
+ }
+
+ /** Grab the entire pipeline.
+ *@param transformationConnections - the transformation connections, in order
+ *@param outputConnection - the output connection
+ *@param transformationDescriptionStrings - the array of description strings for transformations
+ *@param outputDescriptionString - the output description string
+ *@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
+ */
+ protected PipelineObject pipelineGrab(PipelineConnections pipelineConnections)
+ throws ManifoldCFException
+ {
+ // Pick up all needed transformation connectors
+ ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(pipelineConnections.getTransformationConnectionNames(),pipelineConnections.getTransformationConnections());
for (ITransformationConnector c : transformationConnectors)
{
if (c == null)
{
- transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+ transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
return null;
}
}
- // Last, pick up output connector. If it fails we have to release the transformation connectors.
+ // Pick up all needed output connectors. If this fails we have to release the transformation connectors.
try
{
- IOutputConnector outputConnector = outputConnectorPool.grab(outputConnection);
- if (outputConnector == null)
+ IOutputConnector[] outputConnectors = outputConnectorPool.grabMultiple(pipelineConnections.getOutputConnectionNames(),pipelineConnections.getOutputConnections());
+ for (IOutputConnector c : outputConnectors)
{
- transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
- return null;
+ if (c == null)
+ {
+ outputConnectorPool.releaseMultiple(pipelineConnections.getOutputConnections(),outputConnectors);
+ transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
+ return null;
+ }
}
- return new PipelineObject(transformationConnections,transformationConnectors,outputConnection,outputConnector,
- transformationDescriptionStrings,outputDescriptionString);
+ return new PipelineObject(pipelineConnections,transformationConnectors,outputConnectors);
}
catch (Throwable e)
{
- transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+ transformationConnectorPool.releaseMultiple(pipelineConnections.getTransformationConnections(),transformationConnectors);
if (e instanceof ManifoldCFException)
throw (ManifoldCFException)e;
else if (e instanceof RuntimeException)
@@ -498,35 +537,141 @@ public class IncrementalIngester extends
}
- /** Get transformation version strings for a document.
- *@param transformationConnectionNames are the names of the transformation connections associated with this action.
- *@param specs are the transformation specifications.
- *@return the description strings.
+ /** Get transformation version string for a document.
+ *@param transformationConnectionName is the names of the transformation connection associated with this action.
+ *@param spec is the transformation specification.
+ *@return the description string.
*/
- @Override
- public String[] getTransformationDescriptions(String[] transformationConnectionNames, OutputSpecification[] specs)
+ public String getTransformationDescription(String transformationConnectionName, OutputSpecification spec)
throws ManifoldCFException, ServiceInterruption
{
- String[] rval = new String[transformationConnectionNames.length];
- for (int i = 0; i < rval.length; i++)
+ ITransformationConnection connection = transformationConnectionManager.load(transformationConnectionName);
+ ITransformationConnector connector = transformationConnectorPool.grab(connection);
+ if (connector == null)
+ // The connector is not installed; treat this as a service interruption.
+ throw new ServiceInterruption("Transformation connector not installed",0L);
+ try
{
- String transformationConnectionName = transformationConnectionNames[i];
- OutputSpecification spec = specs[i];
- ITransformationConnection connection = transformationConnectionManager.load(transformationConnectionName);
- ITransformationConnector connector = transformationConnectorPool.grab(connection);
- if (connector == null)
- // The connector is not installed; treat this as a service interruption.
- throw new ServiceInterruption("Transformation connector not installed",0L);
- try
- {
- rval[i] = connector.getPipelineDescription(spec);
- }
- finally
+ return connector.getPipelineDescription(spec);
+ }
+ finally
+ {
+ transformationConnectorPool.release(connection,connector);
+ }
+ }
+
+ /** Determine whether we need to fetch or refetch a document.
+ * Pass in information including the pipeline specification with existing version info, plus new document and parameter version strings.
+ * If no outputs need to be updated, then this method will return false. If any outputs need updating, then true is returned.
+ *@param pipelineSpecificationWithVersions is the pipeline specification including new version info for all transformation and output
+ * connections.
+ *@param newDocumentVersion is the newly-determined document version.
+ *@param newParameterVersion is the newly-determined parameter version.
+ *@param newAuthorityNameString is the newly-determined authority name.
+ *@return true if the document needs to be refetched.
+ */
+ @Override
+ public boolean checkFetchDocument(
+ IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+ String newDocumentVersion,
+ String newParameterVersion,
+ String newAuthorityNameString)
+ {
+ IPipelineSpecification pipelineSpecification = pipelineSpecificationWithVersions.getPipelineSpecification();
+ IPipelineSpecificationBasic basicSpecification = pipelineSpecification.getBasicPipelineSpecification();
+ // Empty document version has a special meaning....
+ if (newDocumentVersion.length() == 0)
+ return true;
+ // Otherwise, cycle through the outputs
+ for (int i = 0; i < basicSpecification.getOutputCount(); i++)
+ {
+ int stage = basicSpecification.getOutputStage(i);
+ String oldDocumentVersion = pipelineSpecificationWithVersions.getOutputDocumentVersionString(i);
+ String oldParameterVersion = pipelineSpecificationWithVersions.getOutputParameterVersionString(i);
+ String oldOutputVersion = pipelineSpecificationWithVersions.getOutputVersionString(i);
+ String oldAuthorityName = pipelineSpecificationWithVersions.getAuthorityNameString(i);
+ // If it looks like we never indexed this output before, we need to do it now.
+ if (oldDocumentVersion == null)
+ return true;
+ // Look first at the version strings that aren't pipeline dependent
+ if (!oldDocumentVersion.equals(newDocumentVersion) ||
+ !oldParameterVersion.equals(newParameterVersion) ||
+ !oldAuthorityName.equals(newAuthorityNameString) ||
+ !oldOutputVersion.equals(pipelineSpecification.getStageDescriptionString(stage)))
+ return true;
+
+ // Everything matches so far. Next step is to compute a transformation path an corresponding version string.
+ String newTransformationVersion = computePackedTransformationVersion(pipelineSpecification,stage);
+ if (!pipelineSpecificationWithVersions.getOutputTransformationVersionString(i).equals(newTransformationVersion))
+ return true;
+ }
+ // Everything matches, so no reindexing is needed.
+ return false;
+ }
+
+ /** Compute a transformation version given a pipeline specification and starting output stage.
+ *@param pipelineSpecification is the pipeline specification.
+ *@param stage is the stage number of the output stage.
+ *@return the transformation version string, which will be a composite of all the transformations applied.
+ */
+ protected static String computePackedTransformationVersion(IPipelineSpecification pipelineSpecification, int stage)
+ {
+ IPipelineSpecificationBasic basicSpecification = pipelineSpecification.getBasicPipelineSpecification();
+ // First, count the stages we need to represent
+ int stageCount = 0;
+ int currentStage = stage;
+ while (true)
+ {
+ int newStage = basicSpecification.getStageParent(currentStage);
+ if (newStage == -1)
+ break;
+ stageCount++;
+ currentStage = newStage;
+ }
+ // Doesn't matter how we pack it; I've chosen to do it in reverse for convenience
+ String[] stageNames = new String[stageCount];
+ String[] stageDescriptions = new String[stageCount];
+ stageCount = 0;
+ currentStage = stage;
+ while (true)
+ {
+ int newStage = basicSpecification.getStageParent(currentStage);
+ if (newStage == -1)
+ break;
+ stageNames[stageCount] = basicSpecification.getStageConnectionName(newStage);
+ stageDescriptions[stageCount] = pipelineSpecification.getStageDescriptionString(newStage);
+ stageCount++;
+ currentStage = newStage;
+ }
+ // Finally, do the packing.
+ StringBuilder sb = new StringBuilder();
+ packList(sb,stageNames,'+');
+ packList(sb,stageDescriptions,'!');
+ return sb.toString();
+ }
+
+ protected static void packList(StringBuilder output, String[] values, char delimiter)
+ {
+ pack(output,Integer.toString(values.length),delimiter);
+ int i = 0;
+ while (i < values.length)
+ {
+ pack(output,values[i++],delimiter);
+ }
+ }
+
+ protected static void pack(StringBuilder sb, String value, char delim)
+ {
+ for (int i = 0; i < value.length(); i++)
+ {
+ char x = value.charAt(i);
+ if (x == delim || x == '\\')
{
- transformationConnectorPool.release(connection,connector);
+ sb.append('\\');
}
+ sb.append(x);
}
- return rval;
+ sb.append(delim);
}
/** Record a document version, but don't ingest it.
@@ -540,40 +685,145 @@ public class IncrementalIngester extends
*@param activities is the object used in case a document needs to be removed from the output index as the result of this operation.
*/
@Override
+ @Deprecated
public void documentRecord(String outputConnectionName,
String identifierClass, String identifierHash,
String documentVersion,
long recordTime, IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
+ documentRecord(
+ new RuntPipelineSpecificationBasic(outputConnectionName),
+ identifierClass, identifierHash,
+ documentVersion,
+ recordTime, activities);
+ }
+
+ /** Record a document version, but don't ingest it.
+ * The purpose of this method is to keep track of the frequency at which ingestion "attempts" take place.
+ * ServiceInterruption is thrown if this action must be rescheduled.
+ *@param pipelineSpecificationBasic is the basic pipeline specification needed.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hashed document identifier.
+ *@param documentVersion is the document version.
+ *@param recordTime is the time at which the recording took place, in milliseconds since epoch.
+ *@param activities is the object used in case a document needs to be removed from the output index as the result of this operation.
+ */
+ @Override
+ public void documentRecord(
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String identifierClass, String identifierHash,
+ String documentVersion, long recordTime,
+ IOutputActivity activities)
+ throws ManifoldCFException, ServiceInterruption
+ {
String docKey = makeKey(identifierClass,identifierHash);
+ String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+ IOutputConnection[] outputConnections = connectionManager.loadMultiple(outputConnectionNames);
+
if (Logging.ingest.isDebugEnabled())
{
- Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
+ Logging.ingest.debug("Recording document '"+docKey+"' for output connections '"+outputConnectionNames+"'");
}
- // With a null document URI, this can't throw either ServiceInterruption or IOException
- try
- {
- performIngestion(new ITransformationConnection[0],new String[0],
- connectionManager.load(outputConnectionName),null,
- docKey,documentVersion,null,null,null,
- null,
- null,
- recordTime,
- null,
- activities);
- }
- catch (IOException e)
- {
- throw new RuntimeException("Unexpected IOException thrown: "+e.getMessage(),e);
- }
- catch (ServiceInterruption e)
+ for (int k = 0; k < outputConnectionNames.length; k++)
{
- throw new RuntimeException("Unexpected ServiceInterruption thrown: "+e.getMessage(),e);
- }
+ String outputConnectionName = outputConnectionNames[k];
+ IOutputConnection connection = outputConnections[k];
+
+ String oldURI = null;
+ String oldURIHash = null;
+ String oldOutputVersion = null;
+
+ // Repeat if needed
+ while (true)
+ {
+ long sleepAmt = 0L;
+ try
+ {
+ // See what uri was used before for this doc, if any
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(docKeyField,docKey),
+ new UnitaryClause(outputConnNameField,outputConnectionName)});
+
+ IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+
+ " WHERE "+query,list,null,null);
+
+ if (set.getRowCount() > 0)
+ {
+ IResultRow row = set.getRow(0);
+ oldURI = (String)row.getValue(docURIField);
+ oldURIHash = (String)row.getValue(uriHashField);
+ oldOutputVersion = (String)row.getValue(lastOutputVersionField);
+ }
+
+ break;
+ }
+ catch (ManifoldCFException e)
+ {
+ // Look for deadlock and retry if so
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted select looking for status: "+e.getMessage());
+ sleepAmt = getSleepAmt();
+ continue;
+ }
+ throw e;
+ }
+ finally
+ {
+ sleepFor(sleepAmt);
+ }
+ }
+
+ // If uri hashes collide, then we must be sure to eliminate only the *correct* records from the table, or we will leave
+ // dangling documents around. So, all uri searches and comparisons MUST compare the actual uri as well.
+
+ // But, since we need to insure that any given URI is only worked on by one thread at a time, use critical sections
+ // to block the rare case that multiple threads try to work on the same URI.
+
+ String[] lockArray = computeLockArray(null,oldURI,outputConnectionName);
+ lockManager.enterLocks(null,null,lockArray);
+ try
+ {
+
+ ArrayList list = new ArrayList();
+
+ if (oldURI != null)
+ {
+ IOutputConnector connector = outputConnectorPool.grab(connection);
+ if (connector == null)
+ // The connector is not installed; treat this as a service interruption.
+ throw new ServiceInterruption("Output connector not installed",0L);
+ try
+ {
+ connector.removeDocument(oldURI,oldOutputVersion,new OutputRemoveActivitiesWrapper(activities,outputConnectionName));
+ }
+ finally
+ {
+ outputConnectorPool.release(connection,connector);
+ }
+ // Delete all records from the database that match the old URI, except for THIS record.
+ list.clear();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(uriHashField,"=",oldURIHash),
+ new UnitaryClause(outputConnNameField,outputConnectionName)});
+ list.add(docKey);
+ performDelete("WHERE "+query+" AND "+docKeyField+"!=?",list,null);
+ }
+ // If we get here, it means we are noting that the document was examined, but that no change was required. This is signaled
+ // to noteDocumentIngest by having the null documentURI.
+ noteDocumentIngest(outputConnectionName,docKey,documentVersion,null,null,null,null,recordTime,null,null);
+ }
+ finally
+ {
+ lockManager.leaveLocks(null,null,lockArray);
+ }
+ }
}
/** Ingest a document.
@@ -651,14 +901,11 @@ public class IncrementalIngester extends
{
try
{
- return documentIngest(new String[0],
- new String[0],
- outputConnectionName,
- outputVersion,
+ return documentIngest(
+ new RuntPipelineSpecificationWithVersions(outputConnectionName,outputVersion,
+ "","","","",""),
identifierClass, identifierHash,
documentVersion,
- "",
- outputVersion,
parameterVersion,
authorityName,
data,
@@ -763,15 +1010,10 @@ public class IncrementalIngester extends
* method also REMOVES ALL OLD METADATA. When complete, the index will contain only the metadata
* described by the RepositoryDocument object passed to this method.
* ServiceInterruption is thrown if the document ingestion must be rescheduled.
- *@param transformationConnectionNames are the names of the transformation connections associated with this action.
- *@param transformationDescriptionStrings are the description strings corresponding to the transformation connection names.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param otuputDescriptionString is the description string corresponding to the output connection.
+ *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param documentVersion is the document version.
- *@param transformationVersion is the version string for the transformations to be performed on the document.
- *@param outputVersion is the output version string for the output connection.
*@param parameterVersion is the version string for the forced parameters.
*@param authorityName is the name of the authority associated with the document, if any.
*@param data is the document data. The data is closed after ingestion is complete.
@@ -783,189 +1025,40 @@ public class IncrementalIngester extends
*/
@Override
public boolean documentIngest(
- String[] transformationConnectionNames,
- String[] transformationDescriptionStrings,
- String outputConnectionName,
- String outputDescriptionString,
+ IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
String identifierClass, String identifierHash,
String documentVersion,
- String transformationVersion,
- String outputVersion,
String parameterVersion,
String authorityName,
- RepositoryDocument data,
+ RepositoryDocument document,
long ingestTime, String documentURI,
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption, IOException
{
- IOutputConnection outputConnection = connectionManager.load(outputConnectionName);
- ITransformationConnection[] transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
+ PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
String docKey = makeKey(identifierClass,identifierHash);
if (Logging.ingest.isDebugEnabled())
{
- Logging.ingest.debug("Ingesting document '"+docKey+"' into output connection '"+outputConnectionName+"'");
+ Logging.ingest.debug("Ingesting document '"+docKey+"' into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
}
- return performIngestion(transformationConnections,transformationDescriptionStrings,
- outputConnection,outputDescriptionString,
- docKey,documentVersion,outputVersion,transformationVersion,parameterVersion,
- authorityName,
- data,
- ingestTime,documentURI,
- activities);
- }
-
- /** Do the actual ingestion, or just record it if there's nothing to ingest. */
- protected boolean performIngestion(
- ITransformationConnection[] transformationConnections, String[] transformationDescriptionStrings,
- IOutputConnection outputConnection, String outputDescriptionString,
- String docKey, String documentVersion, String outputVersion, String transformationVersion, String parameterVersion,
- String authorityNameString,
- RepositoryDocument data,
- long ingestTime, String documentURI,
- IOutputActivity activities)
- throws ManifoldCFException, ServiceInterruption, IOException
- {
- String outputConnectionName = outputConnection.getName();
-
- // No transactions; not safe because post may take too much time
-
- // First, calculate a document uri hash value
- String documentURIHash = null;
- if (documentURI != null)
- documentURIHash = ManifoldCF.hash(documentURI);
-
- String oldURI = null;
- String oldURIHash = null;
- String oldOutputVersion = null;
+ // Set indexing date
+ document.setIndexingDate(new Date());
- while (true)
- {
- long sleepAmt = 0L;
- try
- {
- // See what uri was used before for this doc, if any
- ArrayList list = new ArrayList();
- String query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(docKeyField,docKey),
- new UnitaryClause(outputConnNameField,outputConnectionName)});
-
- IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+
- " WHERE "+query,list,null,null);
-
- if (set.getRowCount() > 0)
- {
- IResultRow row = set.getRow(0);
- oldURI = (String)row.getValue(docURIField);
- oldURIHash = (String)row.getValue(uriHashField);
- oldOutputVersion = (String)row.getValue(lastOutputVersionField);
- }
-
- break;
- }
- catch (ManifoldCFException e)
- {
- // Look for deadlock and retry if so
- if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
- {
- if (Logging.perf.isDebugEnabled())
- Logging.perf.debug("Aborted select looking for status: "+e.getMessage());
- sleepAmt = getSleepAmt();
- continue;
- }
- throw e;
- }
- finally
- {
- sleepFor(sleepAmt);
- }
- }
-
- // If uri hashes collide, then we must be sure to eliminate only the *correct* records from the table, or we will leave
- // dangling documents around. So, all uri searches and comparisons MUST compare the actual uri as well.
-
- // But, since we need to insure that any given URI is only worked on by one thread at a time, use critical sections
- // to block the rare case that multiple threads try to work on the same URI.
- int uriCount = 0;
- if (documentURI != null)
- uriCount++;
- if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
- uriCount++;
- String[] lockArray = new String[uriCount];
- uriCount = 0;
- if (documentURI != null)
- lockArray[uriCount++] = outputConnectionName+":"+documentURI;
- if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
- lockArray[uriCount++] = outputConnectionName+":"+oldURI;
-
- lockManager.enterCriticalSections(null,null,lockArray);
+ // Set up a pipeline
+ PipelineObjectWithVersions pipeline = pipelineGrabWithVersions(pipelineConnectionsWithVersions);
+ if (pipeline == null)
+ // A connector is not installed; treat this as a service interruption.
+ throw new ServiceInterruption("Pipeline connector not installed",0L);
try
{
-
- ArrayList list = new ArrayList();
-
- if (oldURI != null && (documentURI == null || !oldURI.equals(documentURI)))
- {
- // Delete all records from the database that match the old URI, except for THIS record.
- list.clear();
- String query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(uriHashField,"=",oldURIHash),
- new UnitaryClause(outputConnNameField,"=",outputConnectionName)});
- list.add(docKey);
- performDelete("WHERE "+query+" AND "+docKeyField+"!=?",list,null);
- removeDocument(outputConnection,oldURI,oldOutputVersion,new OutputRemoveActivitiesWrapper(activities,outputConnectionName));
- }
-
- if (documentURI != null)
- {
- // Get rid of all records that match the NEW uri, except for this record.
- list.clear();
- String query = buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(uriHashField,"=",documentURIHash),
- new UnitaryClause(outputConnNameField,"=",outputConnectionName)});
- list.add(docKey);
- performDelete("WHERE "+query+" AND "+ docKeyField+"!=?",list,null);
- }
-
- // Now, we know we are ready for the ingest.
- if (documentURI != null)
- {
- // Here are the cases:
- // 1) There was a service interruption before the upload started.
- // (In that case, we don't need to log anything, just reschedule).
- // 2) There was a service interruption after the document was transmitted.
- // (In that case, we should presume that the document was ingested, but
- // reschedule another import anyway.)
- // 3) Everything went OK
- // (need to log the ingestion.)
- // 4) Everything went OK, but we were told we have an illegal document.
- // (We note the ingestion because if we don't we will be forced to repeat ourselves.
- // In theory, document doesn't need to be deleted, but there is no way to signal
- // that at the moment.)
-
- // Note an ingestion before we actually try it.
- // This is a marker that says "something is there"; it has an empty version, which indicates
- // that we don't know anything about it. That means it will be reingested when the
- // next version comes along, and will be deleted if called for also.
- noteDocumentIngest(outputConnectionName,docKey,null,null,null,null,null,ingestTime,documentURI,documentURIHash);
- int result = addOrReplaceDocument(transformationConnections,transformationDescriptionStrings,
- outputConnection,outputDescriptionString,
- documentURI,data,authorityNameString,
- activities);
- noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion, outputVersion,parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
- return result == IOutputConnector.DOCUMENTSTATUS_ACCEPTED;
- }
-
- // If we get here, it means we are noting that the document was examined, but that no change was required. This is signaled
- // to noteDocumentIngest by having the null documentURI.
- noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,null,null);
- return true;
+ return pipeline.addOrReplaceDocumentWithException(docKey,documentURI,document,documentVersion,parameterVersion,authorityName,activities,ingestTime) == IPipelineConnector.DOCUMENTSTATUS_ACCEPTED;
}
finally
{
- lockManager.leaveCriticalSections(null,null,lockArray);
+ pipeline.release();
}
}
@@ -977,37 +1070,66 @@ public class IncrementalIngester extends
*@param checkTime is the time at which the check took place, in milliseconds since epoch.
*/
@Override
+ @Deprecated
public void documentCheckMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes,
long checkTime)
throws ManifoldCFException
{
+ documentCheckMultiple(new RuntPipelineSpecificationBasic(outputConnectionName),
+ identifierClasses,identifierHashes,checkTime);
+ }
+
+ protected static String[] extractOutputConnectionNames(IPipelineSpecificationBasic pipelineSpecificationBasic)
+ {
+ String[] rval = new String[pipelineSpecificationBasic.getOutputCount()];
+ for (int i = 0; i < rval.length; i++)
+ {
+ rval[i] = pipelineSpecificationBasic.getStageConnectionName(pipelineSpecificationBasic.getOutputStage(i));
+ }
+ return rval;
+ }
+
+ /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
+ * versions agreed).
+ *@param pipelineSpecificationBasic is a pipeline specification.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes are the set of document identifier hashes.
+ *@param checkTime is the time at which the check took place, in milliseconds since epoch.
+ */
+ @Override
+ public void documentCheckMultiple(
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String[] identifierClasses, String[] identifierHashes,
+ long checkTime)
+ throws ManifoldCFException
+ {
+ // Extract output connection names from pipeline spec
+ String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
beginTransaction();
try
{
int maxClauses;
- HashMap docIDValues = new HashMap();
- int j = 0;
- while (j < identifierHashes.length)
+ Set<String> docIDValues = new HashSet<String>();
+ for (int j = 0; j < identifierHashes.length; j++)
{
String docDBString = makeKey(identifierClasses[j],identifierHashes[j]);
- docIDValues.put(docDBString,docDBString);
- j++;
+ docIDValues.add(docDBString);
}
// Now, perform n queries, each of them no larger the maxInClause in length.
// Create a list of row id's from this.
- HashMap rowIDSet = new HashMap();
- Iterator iter = docIDValues.keySet().iterator();
- j = 0;
- ArrayList list = new ArrayList();
- maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
+ Set<Long> rowIDSet = new HashSet<Long>();
+ Iterator<String> iter = docIDValues.iterator();
+ int j = 0;
+ List<String> list = new ArrayList<String>();
+ maxClauses = maxClausesRowIdsForDocIds(outputConnectionNames);
while (iter.hasNext())
{
if (j == maxClauses)
{
- findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
+ findRowIdsForDocIds(outputConnectionNames,rowIDSet,list);
list.clear();
j = 0;
}
@@ -1016,27 +1138,27 @@ public class IncrementalIngester extends
}
if (j > 0)
- findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
+ findRowIdsForDocIds(outputConnectionNames,rowIDSet,list);
// Now, break row id's into chunks too; submit one chunk at a time
j = 0;
- list.clear();
- iter = rowIDSet.keySet().iterator();
+ List<Long> list2 = new ArrayList<Long>();
+ Iterator<Long> iter2 = rowIDSet.iterator();
maxClauses = maxClausesUpdateRowIds();
- while (iter.hasNext())
+ while (iter2.hasNext())
{
if (j == maxClauses)
{
- updateRowIds(list,checkTime);
- list.clear();
+ updateRowIds(list2,checkTime);
+ list2.clear();
j = 0;
}
- list.add(iter.next());
+ list2.add(iter2.next());
j++;
}
if (j > 0)
- updateRowIds(list,checkTime);
+ updateRowIds(list2,checkTime);
}
catch (ManifoldCFException e)
{
@@ -1062,12 +1184,31 @@ public class IncrementalIngester extends
*@param checkTime is the time at which the check took place, in milliseconds since epoch.
*/
@Override
+ @Deprecated
public void documentCheck(String outputConnectionName,
String identifierClass, String identifierHash,
long checkTime)
throws ManifoldCFException
{
- documentCheckMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash},checkTime);
+ documentCheck(new RuntPipelineSpecificationBasic(outputConnectionName),
+ identifierClass,identifierHash,checkTime);
+ }
+
+ /** Note the fact that we checked a document (and found that it did not need to be ingested, because the
+ * versions agreed).
+ *@param pipelineSpecificationBasic is a basic pipeline specification.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hashed document identifier.
+ *@param checkTime is the time at which the check took place, in milliseconds since epoch.
+ */
+ @Override
+ public void documentCheck(
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String identifierClass, String identifierHash,
+ long checkTime)
+ throws ManifoldCFException
+ {
+ documentCheckMultiple(pipelineSpecificationBasic,new String[]{identifierClass},new String[]{identifierHash},checkTime);
}
/** Calculate the number of clauses.
@@ -1079,7 +1220,7 @@ public class IncrementalIngester extends
/** Update a chunk of row ids.
*/
- protected void updateRowIds(ArrayList list, long checkTime)
+ protected void updateRowIds(List<Long> list, long checkTime)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
@@ -1091,6 +1232,7 @@ public class IncrementalIngester extends
performUpdate(map,"WHERE "+query,newList,null);
}
+
/** Delete multiple documents from the search engine index.
*@param outputConnectionNames are the names of the output connections associated with this action.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
@@ -1098,45 +1240,64 @@ public class IncrementalIngester extends
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
@Override
+ @Deprecated
public void documentDeleteMultiple(String[] outputConnectionNames,
String[] identifierClasses, String[] identifierHashes,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
-
- // Segregate request by connection names
- HashMap keyMap = new HashMap();
- int i = 0;
- while (i < outputConnectionNames.length)
+ IPipelineSpecificationBasic[] pipelineSpecs = new IPipelineSpecificationBasic[outputConnectionNames.length];
+ for (int i = 0; i < pipelineSpecs.length; i++)
{
- String outputConnectionName = outputConnectionNames[i];
- ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+ pipelineSpecs[i] = new RuntPipelineSpecificationBasic(outputConnectionNames[i]);
+ }
+ documentDeleteMultiple(pipelineSpecs,
+ identifierClasses,identifierHashes,activities);
+ }
+
+ /** Delete multiple documents from the search engine index.
+ *@param pipelineSpecificationBasics are the pipeline specifications associated with the documents.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is tha array of document identifier hashes if the documents.
+ *@param activities is the object to use to log the details of the ingestion attempt. May be null.
+ */
+ @Override
+ public void documentDeleteMultiple(
+ IPipelineSpecificationBasic[] pipelineSpecificationBasics,
+ String[] identifierClasses, String[] identifierHashes,
+ IOutputRemoveActivity activities)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ // Segregate request by pipeline spec instance address. Not perfect but works in the
+ // environment it is used it.
+ Map<IPipelineSpecificationBasic,List<Integer>> keyMap = new HashMap<IPipelineSpecificationBasic,List<Integer>>();
+ for (int i = 0; i < pipelineSpecificationBasics.length; i++)
+ {
+ IPipelineSpecificationBasic spec = pipelineSpecificationBasics[i];
+ List<Integer> list = keyMap.get(spec);
if (list == null)
{
- list = new ArrayList();
- keyMap.put(outputConnectionName,list);
+ list = new ArrayList<Integer>();
+ keyMap.put(spec,list);
}
list.add(new Integer(i));
- i++;
}
// Create the return array.
- Iterator iter = keyMap.keySet().iterator();
+ Iterator<IPipelineSpecificationBasic> iter = keyMap.keySet().iterator();
while (iter.hasNext())
{
- String outputConnectionName = (String)iter.next();
- ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+ IPipelineSpecificationBasic spec = iter.next();
+ List<Integer> list = keyMap.get(spec);
String[] localIdentifierClasses = new String[list.size()];
String[] localIdentifierHashes = new String[list.size()];
- i = 0;
- while (i < localIdentifierClasses.length)
+ for (int i = 0; i < localIdentifierClasses.length; i++)
{
- int index = ((Integer)list.get(i)).intValue();
+ int index = list.get(i).intValue();
localIdentifierClasses[i] = identifierClasses[index];
localIdentifierHashes[i] = identifierHashes[index];
- i++;
}
- documentDeleteMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes,activities);
+ documentDeleteMultiple(spec,localIdentifierClasses,localIdentifierHashes,activities);
}
}
@@ -1147,210 +1308,225 @@ public class IncrementalIngester extends
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
@Override
+ @Deprecated
public void documentDeleteMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
- activities = new OutputRemoveActivitiesWrapper(activities,outputConnectionName);
+ documentDeleteMultiple(new RuntPipelineSpecificationBasic(outputConnectionName),
+ identifierClasses,identifierHashes,activities);
+ }
+
+ /** Delete multiple documents from the search engine index.
+ *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is tha array of document identifier hashes if the documents.
+ *@param activities is the object to use to log the details of the ingestion attempt. May be null.
+ */
+ @Override
+ public void documentDeleteMultiple(
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String[] identifierClasses, String[] identifierHashes,
+ IOutputRemoveActivity activities)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+ // Load connection managers up front to save time
+ IOutputConnection[] outputConnections = connectionManager.loadMultiple(outputConnectionNames);
+
+ // No transactions here, so we can cycle through the connection names one at a time
+ for (int z = 0; z < outputConnectionNames.length; z++)
+ {
+ String outputConnectionName = outputConnectionNames[z];
+ IOutputConnection connection = outputConnections[z];
- IOutputConnection connection = connectionManager.load(outputConnectionName);
+ activities = new OutputRemoveActivitiesWrapper(activities,outputConnectionName);
- if (Logging.ingest.isDebugEnabled())
- {
- int i = 0;
- while (i < identifierHashes.length)
+ if (Logging.ingest.isDebugEnabled())
{
- Logging.ingest.debug("Request to delete document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' from output connection '"+outputConnectionName+"'");
- i++;
+ for (int i = 0; i < identifierHashes.length; i++)
+ {
+ Logging.ingest.debug("Request to delete document '"+makeKey(identifierClasses[i],identifierHashes[i])+"' from output connection '"+outputConnectionName+"'");
+ }
}
- }
- // No transactions. Time for the operation may exceed transaction timeout.
+ // No transactions. Time for the operation may exceed transaction timeout.
- // Obtain the current URIs of all of these.
- DeleteInfo[] uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes);
+ // Obtain the current URIs of all of these.
+ DeleteInfo[] uris = getDocumentURIMultiple(outputConnectionName,identifierClasses,identifierHashes);
- // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting.
- // (This guarantees that when this operation is complete the database reflects reality.)
- int validURIcount = 0;
- int i = 0;
- while (i < uris.length)
- {
- if (uris[i] != null && uris[i].getURI() != null)
- validURIcount++;
- i++;
- }
- String[] lockArray = new String[validURIcount];
- String[] validURIArray = new String[validURIcount];
- validURIcount = 0;
- i = 0;
- while (i < uris.length)
- {
- if (uris[i] != null && uris[i].getURI() != null)
+ // Grab critical section locks so that we can't attempt to ingest at the same time we are deleting.
+ // (This guarantees that when this operation is complete the database reflects reality.)
+ int validURIcount = 0;
+ for (int i = 0; i < uris.length; i++)
{
- validURIArray[validURIcount] = uris[i].getURI();
- lockArray[validURIcount] = outputConnectionName+":"+validURIArray[validURIcount];
- validURIcount++;
+ if (uris[i] != null && uris[i].getURI() != null)
+ validURIcount++;
}
- i++;
- }
-
- lockManager.enterCriticalSections(null,null,lockArray);
- try
- {
- // Fetch the document URIs for the listed documents
- int j = 0;
- while (j < uris.length)
+ String[] lockArray = new String[validURIcount];
+ String[] validURIArray = new String[validURIcount];
+ validURIcount = 0;
+ for (int i = 0; i < uris.length; i++)
{
- if (uris[j] != null && uris[j].getURI() != null)
- removeDocument(connection,uris[j].getURI(),uris[j].getOutputVersion(),activities);
- j++;
+ if (uris[i] != null && uris[i].getURI() != null)
+ {
+ validURIArray[validURIcount] = uris[i].getURI();
+ lockArray[validURIcount] = outputConnectionName+":"+validURIArray[validURIcount];
+ validURIcount++;
+ }
}
- // Now, get rid of all rows that match the given uris.
- // Do the queries together, then the deletes
- beginTransaction();
+ lockManager.enterLocks(null,null,lockArray);
try
{
- // The basic process is this:
- // 1) Come up with a set of urihash values
- // 2) Find the matching, corresponding id values
- // 3) Delete the rows corresponding to the id values, in sequence
-
- // Process (1 & 2) has to be broken down into chunks that contain the maximum
- // number of doc hash values each. We need to avoid repeating doc hash values,
- // so the first step is to come up with ALL the doc hash values before looping
- // over them.
-
- int maxClauses;
-
- // Find all the documents that match this set of URIs
- HashMap docURIHashValues = new HashMap();
- HashMap docURIValues = new HashMap();
- j = 0;
- while (j < validURIArray.length)
+ // Fetch the document URIs for the listed documents
+ for (int i = 0; i < uris.length; i++)
{
- String docDBString = validURIArray[j++];
- String docDBHashString = ManifoldCF.hash(docDBString);
- docURIValues.put(docDBString,docDBString);
- docURIHashValues.put(docDBHashString,docDBHashString);
+ if (uris[i] != null && uris[i].getURI() != null)
+ removeDocument(connection,uris[i].getURI(),uris[i].getOutputVersion(),activities);
}
- // Now, perform n queries, each of them no larger the maxInClause in length.
- // Create a list of row id's from this.
- HashMap rowIDSet = new HashMap();
- Iterator iter = docURIHashValues.keySet().iterator();
- j = 0;
- ArrayList hashList = new ArrayList();
- maxClauses = maxClausesRowIdsForURIs(outputConnectionName);
- while (iter.hasNext())
+ // Now, get rid of all rows that match the given uris.
+ // Do the queries together, then the deletes
+ beginTransaction();
+ try
{
- if (j == maxClauses)
+ // The basic process is this:
+ // 1) Come up with a set of urihash values
+ // 2) Find the matching, corresponding id values
+ // 3) Delete the rows corresponding to the id values, in sequence
+
+ // Process (1 & 2) has to be broken down into chunks that contain the maximum
+ // number of doc hash values each. We need to avoid repeating doc hash values,
+ // so the first step is to come up with ALL the doc hash values before looping
+ // over them.
+
+ int maxClauses;
+
+ // Find all the documents that match this set of URIs
+ Set<String> docURIHashValues = new HashSet<String>();
+ Set<String> docURIValues = new HashSet<String>();
+ for (String docDBString : validURIArray)
{
- findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
- hashList.clear();
- j = 0;
+ String docDBHashString = ManifoldCF.hash(docDBString);
+ docURIValues.add(docDBString);
+ docURIHashValues.add(docDBHashString);
}
- hashList.add(iter.next());
- j++;
- }
- if (j > 0)
- findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+ // Now, perform n queries, each of them no larger the maxInClause in length.
+ // Create a list of row id's from this.
+ Set<Long> rowIDSet = new HashSet<Long>();
+ Iterator<String> iter = docURIHashValues.iterator();
+ int j = 0;
+ List<String> hashList = new ArrayList<String>();
+ maxClauses = maxClausesRowIdsForURIs(outputConnectionName);
+ while (iter.hasNext())
+ {
+ if (j == maxClauses)
+ {
+ findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+ hashList.clear();
+ j = 0;
+ }
+ hashList.add(iter.next());
+ j++;
+ }
- // Next, go through the list of row IDs, and delete them in chunks
- j = 0;
- ArrayList list = new ArrayList();
- iter = rowIDSet.keySet().iterator();
- maxClauses = maxClausesDeleteRowIds();
- while (iter.hasNext())
- {
- if (j == maxClauses)
+ if (j > 0)
+ findRowIdsForURIs(outputConnectionName,rowIDSet,docURIValues,hashList);
+
+ // Next, go through the list of row IDs, and delete them in chunks
+ j = 0;
+ List<Long> list = new ArrayList<Long>();
+ Iterator<Long> iter2 = rowIDSet.iterator();
+ maxClauses = maxClausesDeleteRowIds();
+ while (iter2.hasNext())
{
- deleteRowIds(list);
- list.clear();
- j = 0;
+ if (j == maxClauses)
+ {
+ deleteRowIds(list);
+ list.clear();
+ j = 0;
+ }
+ list.add(iter2.next());
+ j++;
}
- list.add(iter.next());
- j++;
- }
- if (j > 0)
- deleteRowIds(list);
+ if (j > 0)
+ deleteRowIds(list);
- // Now, find the set of documents that remain that match the document identifiers.
- HashMap docIdValues = new HashMap();
- j = 0;
- while (j < identifierHashes.length)
- {
- String docDBString = makeKey(identifierClasses[j],identifierHashes[j]);
- docIdValues.put(docDBString,docDBString);
- j++;
- }
+ // Now, find the set of documents that remain that match the document identifiers.
+ Set<String> docIdValues = new HashSet<String>();
+ for (int i = 0; i < identifierHashes.length; i++)
+ {
+ String docDBString = makeKey(identifierClasses[i],identifierHashes[i]);
+ docIdValues.add(docDBString);
+ }
- // Now, perform n queries, each of them no larger the maxInClause in length.
- // Create a list of row id's from this.
- rowIDSet.clear();
- iter = docIdValues.keySet().iterator();
- j = 0;
- list.clear();
- maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
- while (iter.hasNext())
- {
- if (j == maxClauses)
+ // Now, perform n queries, each of them no larger the maxInClause in length.
+ // Create a list of row id's from this.
+ rowIDSet.clear();
+ iter = docIdValues.iterator();
+ j = 0;
+ List<String> list2 = new ArrayList<String>();
+ maxClauses = maxClausesRowIdsForDocIds(outputConnectionName);
+ while (iter.hasNext())
{
- findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
- list.clear();
- j = 0;
+ if (j == maxClauses)
+ {
+ findRowIdsForDocIds(outputConnectionName,rowIDSet,list2);
+ list2.clear();
+ j = 0;
+ }
+ list2.add(iter.next());
+ j++;
}
- list.add(iter.next());
- j++;
- }
- if (j > 0)
- findRowIdsForDocIds(outputConnectionName,rowIDSet,list);
+ if (j > 0)
+ findRowIdsForDocIds(outputConnectionName,rowIDSet,list2);
- // Next, go through the list of row IDs, and delete them in chunks
- j = 0;
- list.clear();
- iter = rowIDSet.keySet().iterator();
- maxClauses = maxClausesDeleteRowIds();
- while (iter.hasNext())
- {
- if (j == maxClauses)
+ // Next, go through the list of row IDs, and delete them in chunks
+ j = 0;
+ list.clear();
+ iter2 = rowIDSet.iterator();
+ maxClauses = maxClausesDeleteRowIds();
+ while (iter2.hasNext())
{
- deleteRowIds(list);
- list.clear();
- j = 0;
+ if (j == maxClauses)
+ {
+ deleteRowIds(list);
+ list.clear();
+ j = 0;
+ }
+ list.add(iter2.next());
+ j++;
}
- list.add(iter.next());
- j++;
- }
- if (j > 0)
- deleteRowIds(list);
+ if (j > 0)
+ deleteRowIds(list);
- }
- catch (ManifoldCFException e)
- {
- signalRollback();
- throw e;
- }
- catch (Error e)
- {
- signalRollback();
- throw e;
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
}
finally
{
- endTransaction();
+ lockManager.leaveLocks(null,null,lockArray);
}
}
- finally
- {
- lockManager.leaveCriticalSections(null,null,lockArray);
- }
}
/** Calculate the clauses.
@@ -1364,7 +1540,7 @@ public class IncrementalIngester extends
/** Given values and parameters corresponding to a set of hash values, add corresponding
* table row id's to the output map.
*/
- protected void findRowIdsForURIs(String outputConnectionName, HashMap rowIDSet, HashMap uris, ArrayList hashParamValues)
+ protected void findRowIdsForURIs(String outputConnectionName, Set<Long> rowIDSet, Set<String> uris, List<String> hashParamValues)
throws ManifoldCFException
{
ArrayList list = new ArrayList();
@@ -1374,18 +1550,17 @@ public class IncrementalIngester extends
IResultSet set = performQuery("SELECT "+idField+","+docURIField+" FROM "+
getTableName()+" WHERE "+query,list,null,null);
-
- int i = 0;
- while (i < set.getRowCount())
+
+ for (int i = 0; i < set.getRowCount(); i++)
{
- IResultRow row = set.getRow(i++);
+ IResultRow row = set.getRow(i);
String docURI = (String)row.getValue(docURIField);
if (docURI != null && docURI.length() > 0)
{
- if (uris.get(docURI) != null)
+ if (uris.contains(docURI))
{
Long rowID = (Long)row.getValue(idField);
- rowIDSet.put(rowID,rowID);
+ rowIDSet.add(rowID);
}
}
}
@@ -1398,11 +1573,19 @@ public class IncrementalIngester extends
return findConjunctionClauseMax(new ClauseDescription[]{
new UnitaryClause(outputConnNameField,outputConnectionName)});
}
+
+ /** Calculate the maximum number of doc ids we should use.
+ */
+ protected int maxClausesRowIdsForDocIds(String[] outputConnectionNames)
+ {
+ return findConjunctionClauseMax(new ClauseDescription[]{
+ new MultiClause(outputConnNameField,outputConnectionNames)});
+ }
/** Given values and parameters corresponding to a set of hash values, add corresponding
* table row id's to the output map.
*/
- protected void findRowIdsForDocIds(String outputConnectionName, HashMap rowIDSet, ArrayList paramValues)
+ protected void findRowIdsForDocIds(String outputConnectionName, Set<Long> rowIDSet, List<String> paramValues)
throws ManifoldCFException
{
ArrayList list = new ArrayList();
@@ -1412,13 +1595,34 @@ public class IncrementalIngester extends
IResultSet set = performQuery("SELECT "+idField+" FROM "+
getTableName()+" WHERE "+query,list,null,null);
+
+ for (int i = 0; i < set.getRowCount(); i++)
+ {
+ IResultRow row = set.getRow(i);
+ Long rowID = (Long)row.getValue(idField);
+ rowIDSet.add(rowID);
+ }
+ }
+
+ /** Given values and parameters corresponding to a set of hash values, add corresponding
+ * table row id's to the output map.
+ */
+ protected void findRowIdsForDocIds(String[] outputConnectionNames, Set<Long> rowIDSet, List<String> paramValues)
+ throws ManifoldCFException
+ {
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new MultiClause(docKeyField,paramValues),
+ new MultiClause(outputConnNameField,outputConnectionNames)});
- int i = 0;
- while (i < set.getRowCount())
+ IResultSet set = performQuery("SELECT "+idField+" FROM "+
+ getTableName()+" WHERE "+query,list,null,null);
+
+ for (int i = 0; i < set.getRowCount(); i++)
{
- IResultRow row = set.getRow(i++);
+ IResultRow row = set.getRow(i);
Long rowID = (Long)row.getValue(idField);
- rowIDSet.put(rowID,rowID);
+ rowIDSet.add(rowID);
}
}
@@ -1431,7 +1635,7 @@ public class IncrementalIngester extends
/** Delete a chunk of row ids.
*/
- protected void deleteRowIds(ArrayList list)
+ protected void deleteRowIds(List<Long> list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
@@ -1447,12 +1651,30 @@ public class IncrementalIngester extends
*@param activities is the object to use to log the details of the ingestion attempt. May be null.
*/
@Override
+ @Deprecated
public void documentDelete(String outputConnectionName,
String identifierClass, String identifierHash,
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
- documentDeleteMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash},activities);
+ documentDelete(new RuntPipelineSpecificationBasic(outputConnectionName),
+ identifierClass,identifierHash,activities);
+ }
+
+ /** Delete a document from the search engine index.
+ *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hash of the id of the document.
+ *@param activities is the object to use to log the details of the ingestion attempt. May be null.
+ */
+ @Override
+ public void documentDelete(
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String identifierClass, String identifierHash,
+ IOutputRemoveActivity activities)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ documentDeleteMultiple(pipelineSpecificationBasic,new String[]{identifierClass},new String[]{identifierHash},activities);
}
/** Find out what URIs a SET of document URIs are currently ingested.
@@ -1464,22 +1686,20 @@ public class IncrementalIngester extends
throws ManifoldCFException
{
DeleteInfo[] rval = new DeleteInfo[identifierHashes.length];
- HashMap map = new HashMap();
- int i = 0;
- while (i < identifierHashes.length)
+ Map<String,Integer> map = new HashMap<String,Integer>();
+ for (int i = 0; i < identifierHashes.length; i++)
{
map.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
rval[i] = null;
- i++;
}
beginTransaction();
try
{
- ArrayList list = new ArrayList();
+ List<String> list = new ArrayList<String>();
int maxCount = maxClauseDocumentURIChunk(outputConnectionName);
int j = 0;
- Iterator iter = map.keySet().iterator();
+ Iterator<String> iter = map.keySet().iterator();
while (iter.hasNext())
{
if (j == maxCount)
@@ -1511,100 +1731,85 @@ public class IncrementalIngester extends
}
}
- /** Look up ingestion data for a SET of documents.
- *@param outputConnectionNames are the names of the output connections associated with this action.
+ /** Look up ingestion data for a set of documents.
+ *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+ *@param pipelineSpecificationBasics are the pipeline specifications corresponding to the identifier classes and hashes.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is the array of document identifier hashes to look up.
- *@return the array of document data. Null will come back for any identifier that doesn't
- * exist in the index.
*/
@Override
- public DocumentIngestStatus[] getDocumentIngestDataMultiple(String[] outputConnectionNames,
+ public void getPipelineDocumentIngestDataMultiple(
+ Map<OutputKey,DocumentIngestStatus> rval,
+ IPipelineSpecificationBasic[] pipelineSpecificationBasics,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
{
- // Segregate request by connection names
- HashMap keyMap = new HashMap();
- int i = 0;
- while (i < outputConnectionNames.length)
+ // Organize by pipeline spec.
+ Map<IPipelineSpecificationBasic,List<Integer>> keyMap = new HashMap<IPipelineSpecificationBasic,List<Integer>>();
+ for (int i = 0; i < pipelineSpecificationBasics.length; i++)
{
- String outputConnectionName = outputConnectionNames[i];
- ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+ IPipelineSpecificationBasic spec = pipelineSpecificationBasics[i];
+ List<Integer> list = keyMap.get(spec);
if (list == null)
{
- list = new ArrayList();
- keyMap.put(outputConnectionName,list);
+ list = new ArrayList<Integer>();
+ keyMap.put(spec,list);
}
list.add(new Integer(i));
- i++;
}
// Create the return array.
- DocumentIngestStatus[] rval = new DocumentIngestStatus[outputConnectionNames.length];
- Iterator iter = keyMap.keySet().iterator();
+ Iterator<IPipelineSpecificationBasic> iter = keyMap.keySet().iterator();
while (iter.hasNext())
{
- String outputConnectionName = (String)iter.next();
- ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+ IPipelineSpecificationBasic spec = iter.next();
+ List<Integer> list = keyMap.get(spec);
String[] localIdentifierClasses = new String[list.size()];
String[] localIdentifierHashes = new String[list.size()];
- i = 0;
- while (i < localIdentifierClasses.length)
+ for (int i = 0; i < localIdentifierClasses.length; i++)
{
- int index = ((Integer)list.get(i)).intValue();
+ int index = list.get(i).intValue();
localIdentifierClasses[i] = identifierClasses[index];
localIdentifierHashes[i] = identifierHashes[index];
- i++;
- }
- DocumentIngestStatus[] localRval = getDocumentIngestDataMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes);
- i = 0;
- while (i < localRval.length)
- {
- int index = ((Integer)list.get(i)).intValue();
- rval[index] = localRval[i];
- i++;
}
+ getPipelineDocumentIngestDataMultiple(rval,spec,localIdentifierClasses,localIdentifierHashes);
}
- return rval;
}
/** Look up ingestion data for a SET of documents.
- *@param outputConnectionName is the names of the output connection associated with this action.
+ *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+ *@param pipelineSpecificationBasic is the pipeline specification for all documents.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is the array of document identifier hashes to look up.
- *@return the array of document data. Null will come back for any identifier that doesn't
- * exist in the index.
*/
@Override
- public DocumentIngestStatus[] getDocumentIngestDataMultiple(String outputConnectionName,
+ public void getPipelineDocumentIngestDataMultiple(
+ Map<OutputKey,DocumentIngestStatus> rval,
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
{
- // Build the return array
- DocumentIngestStatus[] rval = new DocumentIngestStatus[identifierHashes.length];
+ String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
// Build a map, so we can convert an identifier into an array index.
- HashMap indexMap = new HashMap();
- int i = 0;
- while (i < identifierHashes.length)
+ Map<String,Integer> indexMap = new HashMap<String,Integer>();
+ for (int i = 0; i < identifierHashes.length; i++)
{
indexMap.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
- rval[i] = null;
- i++;
}
beginTransaction();
try
{
- ArrayList list = new ArrayList();
- int maxCount = maxClauseDocumentIngestDataChunk(outputConnectionName);
+ List<String> list = new ArrayList<String>();
+ int maxCount = maxClausePipelineDocumentIngestDataChunk(outputConnectionNames);
int j = 0;
- Iterator iter = indexMap.keySet().iterator();
+ Iterator<String> iter = indexMap.keySet().iterator();
while (iter.hasNext())
{
if (j == maxCount)
{
- getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,list);
+ getPipelineDocumentIngestDataChunk(rval,indexMap,outputConnectionNames,list,identifierClasses,identifierHashes);
j = 0;
list.clear();
}
@@ -1612,8 +1817,7 @@ public class IncrementalIngester extends
j++;
}
if (j > 0)
- getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,list);
- return rval;
+ getPipelineDocumentIngestDataChunk(rval,indexMap,outputConnectionNames,list,identifierClasses,identifierHashes);
}
catch (ManifoldCFException e)
{
@@ -1629,78 +1833,248 @@ public class IncrementalIngester extends
{
endTransaction();
}
+
}
- /** Look up ingestion data for a documents.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
- *@param identifierHash is the hash of the id of the document.
- *@return the current document's ingestion data, or null if the document is not currently ingested.
+ /** Get a chunk of document ingest data records.
+ *@param rval is the document ingest status array where the data should be put.
+ *@param map is the map from id to index.
+ *@param clause is the in clause for the query.
+ *@param list is the parameter list for the query.
*/
- @Override
- public DocumentIngestStatus getDocumentIngestData(String outputConnectionName,
- String identifierClass, String identifierHash)
+ protected void getPipelineDocumentIngestDataChunk(Map<OutputKey,DocumentIngestStatus> rval, Map<String,Integer> map, String[] outputConnectionNames, List<String> list,
+ String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
{
- return getDocumentIngestDataMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
- }
-
- /** Calculate the average time interval between changes for a document.
- * This is based on the data gathered for the document.
+ ArrayList newList = new ArrayList();
+ String query = buildConjunctionClause(newList,new ClauseDescription[]{
+ new MultiClause(docKeyField,list),
+ new MultiClause(outputConnNameField,outputConnectionNames)});
+
+ // Get the primary records associated with this hash value
+ IResultSet set = performQuery("SELECT "+idField+","+outputConnNameField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+","+lastTransformationVersionField+
+ " FROM "+getTableName()+" WHERE "+query,newList,null,null);
+
+ // Now, go through the original request once more, this time building the result
+ for (int i = 0; i < set.getRowCount(); i++)
+ {
+ IResultRow row = set.getRow(i);
+ String docHash = row.getValue(docKeyField).toString();
+ Integer position = map.get(docHash);
+ if (position != null)
+ {
+ Long id = (Long)row.getValue(idField);
+ String outputConnectionName = (String)row.getValue(outputConnNameField);
+ String lastVersion = (String)row.getValue(lastVersionField);
+ if (lastVersion == null)
+ lastVersion = "";
+ String lastTransformationVersion = (String)row.getValue(lastTransformationVersionField);
+ if (lastTransformationVersion == null)
+ lastTransformationVersion = "";
+ String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
+ if (lastOutputVersion == null)
+ lastOutputVersion = "";
+ String paramVersion = (String)row.getValue(forcedParamsField);
+ if (paramVersion == null)
+ paramVersion = "";
+ String authorityName = (String)row.getValue(authorityNameField);
+ if (authorityName == null)
+ authorityName = "";
+ int indexValue = position.intValue();
+ rval.put(new OutputKey(identifierClasses[indexValue],identifierHashes[indexValue],outputConnectionName),
+ new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName));
+ }
+ }
+ }
+
+ /** Look up ingestion data for a document.
+ *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+ *@param pipelineSpecificationBasic is the pipeline specification for the document.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hash of the id of the document.
+ */
+ @Override
+ public void getPipelineDocumentIngestData(
+ Map<OutputKey,DocumentIngestStatus> rval,
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String identifierClass, String identifierHash)
+ throws ManifoldCFException
+ {
+ getPipelineDocumentIngestDataMultiple(rval,pipelineSpecificationBasic,
+ new String[]{identifierClass},new String[]{identifierHash});
+ }
+
+ /** Look up ingestion data for a SET of documents.
+ *@param outputConnectionNames are the names of the output connections associated with this action.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is the array of document identifier hashes to look up.
+ *@return the array of document data. Null will come back for any identifier that doesn't
+ * exist in the index.
+ */
+ @Override
+ @Deprecated
+ public DocumentIngestStatus[] getDocumentIngestDataMultiple(String[] outputConnectionNames,
+ String[] identifierClasses, String[] identifierHashes)
+ throws ManifoldCFException
+ {
+ // Segregate request by connection names
+ Map<String,List<Integer>> keyMap = new HashMap<String,List<Integer>>();
+ for (int i = 0; i < outputConnectionNames.length; i++)
+ {
+ String outputConnectionName = outputConnectionNames[i];
+ List<Integer> list = keyMap.get(outputConnectionName);
+ if (list == null)
+ {
+ list = new ArrayList<Integer>();
+ keyMap.put(outputConnectionName,list);
+ }
+ list.add(new Integer(i));
+ }
+
+ // Create the return array.
+ DocumentIngestStatus[] rval = new DocumentIngestStatus[outputConnectionNames.length];
+ Iterator<String> iter = keyMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ String outputConnectionName = iter.next();
+ List<Integer> list = keyMap.get(outputConnectionName);
+ String[] localIdentifierClasses = new String[list.size()];
+ String[] localIdentifierHashes = new String[list.size()];
+ for (int i = 0; i < localIdentifierClasses.length; i++)
+ {
+ int index = list.get(i).intValue();
+ localIdentifierClasses[i] = identifierClasses[index];
+ localIdentifierHashes[i] = identifierHashes[index];
+ }
+ DocumentIngestStatus[] localRval = getDocumentIngestDataMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes);
+ for (int i = 0; i < localRval.length; i++)
+ {
+ int index = list.get(i).intValue();
+ rval[index] = localRval[i];
+ }
+ }
+ return rval;
+ }
+
+ /** Look up ingestion data for a SET of documents.
+ *@param outputConnectionName is the names of the output connection associated with this action.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is the array of document identifier hashes to look up.
+ *@return the array of document data. Null will come back for any identifier that doesn't
+ * exist in the index.
+ */
+ @Override
+ @Deprecated
+ public DocumentIngestStatus[] getDocumentIngestDataMultiple(String outputConnectionName,
+ String[] identifierClasses, String[] identifierHashes)
+ throws ManifoldCFException
+ {
+ // Build the return array
+ DocumentIngestStatus[] rval = new DocumentIngestStatus[identifierHashes.length];
+
+ // Build a map, so we can convert an identifier into an array index.
+ Map<String,Integer> indexMap = new HashMap<String,Integer>();
+ for (int i = 0; i < identifierHashes.length; i++)
+ {
+ indexMap.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
+ rval[i] = null;
+ }
+
+ beginTransaction();
+ try
+ {
+ List<String> list = new ArrayList<String>();
+ int maxCount = maxClauseDocumentIngestDataChunk(outputConnectionName);
+ int j = 0;
+ Iterator<String> iter = indexMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ if (j == maxCount)
+ {
+ getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,list);
+ j = 0;
+ list.clear();
+ }
+ list.add(iter.next());
+ j++;
+ }
+ if (j > 0)
+ getDocumentIngestDataChunk(rval,indexMap,outputConnectionName,list);
+ return rval;
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
+ }
+
+ /** Look up ingestion data for a documents.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hash of the id of the document.
- *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+ *@return the current document's ingestion data, or null if the document is not currently ingested.
*/
@Override
- public long getDocumentUpdateInterval(String outputConnectionName,
+ @Deprecated
+ public DocumentIngestStatus getDocumentIngestData(String outputConnectionName,
String identifierClass, String identifierHash)
throws ManifoldCFException
{
- return getDocumentUpdateIntervalMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
+ return getDocumentIngestDataMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
}
/** Calculate the average time interval between changes for a document.
* This is based on the data gathered for the document.
- *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param pipelineSpecificationBasic is the basic pipeline specification.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is the hashes of the ids of the documents.
*@return the number of milliseconds between changes, or 0 if this cannot be calculated.
*/
- @Override
- public long[] getDocumentUpdateIntervalMultiple(String outputConnectionName,
+ public long[] getDocumentUpdateIntervalMultiple(
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
String[] identifierClasses, String[] identifierHashes)
[... 1595 lines stripped ...]