You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/08 13:44:19 UTC
svn commit: r1601214 - in /manifoldcf/branches/CONNECTORS-946/framework:
agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/
agents/src/main/java/org/apache/manifoldcf/agents/interfaces/
pull-agent/src/main/java/org/apache/manifoldcf/c...
Author: kwright
Date: Sun Jun 8 11:44:19 2014
New Revision: 1601214
URL: http://svn.apache.org/r1601214
Log:
Finish incremental ingester; begin finishing up registration notification
Modified:
manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java
manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/PipelineManager.java
manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1601214&r1=1601213&r2=1601214&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Sun Jun 8 11:44:19 2014
@@ -46,6 +46,7 @@ import java.io.*;
* <tr><td>urihash</td><td>VARCHAR(40)</td><td></td></tr>
* <tr><td>lastversion</td><td>LONGTEXT</td><td></td></tr>
* <tr><td>lastoutputversion</td><td>LONGTEXT</td><td></td></tr>
+* <tr><td>lasttransformationversion</td><td>LONGTEXT</td><td></td></tr>
* <tr><td>forcedparams</td><td>LONGTEXT</td><td></td></tr>
* <tr><td>changecount</td><td>BIGINT</td><td></td></tr>
* <tr><td>firstingest</td><td>BIGINT</td><td></td></tr>
@@ -67,6 +68,7 @@ public class IncrementalIngester extends
protected final static String uriHashField = "urihash";
protected final static String lastVersionField = "lastversion";
protected final static String lastOutputVersionField = "lastoutputversion";
+ protected final static String lastTransformationVersionField = "lasttransformationversion";
protected final static String forcedParamsField = "forcedparams";
protected final static String changeCountField = "changecount";
protected final static String firstIngestField = "firstingest";
@@ -126,6 +128,7 @@ public class IncrementalIngester extends
map.put(uriHashField,new ColumnDescription("VARCHAR(40)",false,true,null,null,false));
map.put(lastVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
map.put(lastOutputVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
+ map.put(lastTransformationVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
map.put(forcedParamsField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
map.put(changeCountField,new ColumnDescription("BIGINT",false,false,null,null,false));
map.put(firstIngestField,new ColumnDescription("BIGINT",false,false,null,null,false));
@@ -143,6 +146,16 @@ public class IncrementalIngester extends
addMap.put(forcedParamsField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
performAlter(addMap,null,null,null);
}
+
+ // Schema upgrade from 1.6 to 1.7
+ cd = (ColumnDescription)existing.get(lastTransformationVersionField);
+ if (cd == null)
+ {
+ Map<String,ColumnDescription> addMap = new HashMap<String,ColumnDescription>();
+ addMap.put(lastTransformationVersionField,new ColumnDescription("LONGTEXT",false,true,null,null,false));
+ performAlter(addMap,null,null,null);
+ }
+
}
// Now, do indexes
@@ -235,7 +248,10 @@ public class IncrementalIngester extends
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+ PipelineObject pipeline = pipelineGrab(
+ transformationConnectionManager.loadMultiple(transformationConnectionNames),
+ connectionManager.load(outputConnectionName),
+ transformationDescriptions,outputDescription);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -283,7 +299,10 @@ public class IncrementalIngester extends
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+ PipelineObject pipeline = pipelineGrab(
+ transformationConnectionManager.loadMultiple(transformationConnectionNames),
+ connectionManager.load(outputConnectionName),
+ transformationDescriptions,outputDescription);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -331,7 +350,10 @@ public class IncrementalIngester extends
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+ PipelineObject pipeline = pipelineGrab(
+ transformationConnectionManager.loadMultiple(transformationConnectionNames),
+ connectionManager.load(outputConnectionName),
+ transformationDescriptions,outputDescription);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -379,7 +401,10 @@ public class IncrementalIngester extends
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+ PipelineObject pipeline = pipelineGrab(
+ transformationConnectionManager.loadMultiple(transformationConnectionNames),
+ connectionManager.load(outputConnectionName),
+ transformationDescriptions,outputDescription);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -395,22 +420,21 @@ public class IncrementalIngester extends
/** Grab the entire pipeline.
- *@param transformationConnectionNames - the names of the transformation connections, in order
- *@param outputConnectionName - the name of the output connection
+ *@param transformationConnections - the transformation connections, in order
+ *@param outputConnection - the output connection
*@param transformationDescriptionStrings - the array of description strings for transformations
*@param outputDescriptionString - the output description string
*@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
*/
- protected PipelineObject pipelineGrab(String[] transformationConnectionNames, String outputConnectionName,
+ protected PipelineObject pipelineGrab(ITransformationConnection[] transformationConnections, IOutputConnection outputConnection,
String[] transformationDescriptionStrings, String outputDescriptionString)
throws ManifoldCFException
{
// Pick up all needed transformation connectors
- ITransformationConnection[] transformationConnections = new ITransformationConnection[transformationConnectionNames.length];
-
+ String[] transformationConnectionNames = new String[transformationConnections.length];
for (int i = 0; i < transformationConnections.length; i++)
{
- transformationConnections[i] = transformationConnectionManager.load(transformationConnectionNames[i]);
+ transformationConnectionNames[i] = transformationConnections[i].getName();
}
ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(transformationConnectionNames,transformationConnections);
@@ -426,14 +450,13 @@ public class IncrementalIngester extends
// Last, pick up output connector. If it fails we have to release the transformation connectors.
try
{
- IOutputConnection connection = connectionManager.load(outputConnectionName);
- IOutputConnector connector = outputConnectorPool.grab(connection);
- if (connector == null)
+ IOutputConnector outputConnector = outputConnectorPool.grab(outputConnection);
+ if (outputConnector == null)
{
transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
return null;
}
- return new PipelineObject(transformationConnections,transformationConnectors,connection,connector,
+ return new PipelineObject(transformationConnections,transformationConnectors,outputConnection,outputConnector,
transformationDescriptionStrings,outputDescriptionString);
}
catch (Throwable e)
@@ -530,9 +553,9 @@ public class IncrementalIngester extends
Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
}
- performIngestion(new String[0],new String[0],
- outputConnectionName,null,
- docKey,documentVersion,null,
+ performIngestion(new ITransformationConnection[0],new String[0],
+ connectionManager.load(outputConnectionName),null,
+ docKey,documentVersion,null,null,null,
null,
null,
recordTime,
@@ -612,10 +635,12 @@ public class IncrementalIngester extends
throws ManifoldCFException, ServiceInterruption
{
return documentIngest(new String[0],
+ new String[0],
outputConnectionName,
+ outputVersion,
identifierClass, identifierHash,
documentVersion,
- new String[0],
+ "",
outputVersion,
parameterVersion,
authorityName,
@@ -630,13 +655,15 @@ public class IncrementalIngester extends
* described by the RepositoryDocument object passed to this method.
* ServiceInterruption is thrown if the document ingestion must be rescheduled.
*@param transformationConnectionNames are the names of the transformation connections associated with this action.
+ *@param transformationDescriptionStrings are the description strings corresponding to the transformation connection names.
*@param outputConnectionName is the name of the output connection associated with this action.
+ *@param otuputDescriptionString is the description string corresponding to the output connection.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param documentVersion is the document version.
- *@param parameterVersion is the forced parameter version.
- *@param transformationVersions are the version strings for the transformations to be performed on the document.
- *@param outputVersion is the output version string constructed from the output specification by the output connector.
+ *@param transformationVersion is the version string for the transformations to be performed on the document.
+ *@param outputVersion is the output version string for the output connection.
+ *@param parameterVersion is the version string for the forced parameters.
*@param authorityName is the name of the authority associated with the document, if any.
*@param data is the document data. The data is closed after ingestion is complete.
*@param ingestTime is the time at which the ingestion took place, in milliseconds since epoch.
@@ -646,10 +673,12 @@ public class IncrementalIngester extends
*/
public boolean documentIngest(
String[] transformationConnectionNames,
+ String[] transformationDescriptionStrings,
String outputConnectionName,
+ String outputDescriptionString,
String identifierClass, String identifierHash,
String documentVersion,
- String[] transformationVersions,
+ String transformationVersion,
String outputVersion,
String parameterVersion,
String authorityName,
@@ -658,15 +687,18 @@ public class IncrementalIngester extends
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
+ IOutputConnection outputConnection = connectionManager.load(outputConnectionName);
+ ITransformationConnection[] transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
+
String docKey = makeKey(identifierClass,identifierHash);
if (Logging.ingest.isDebugEnabled())
{
Logging.ingest.debug("Ingesting document '"+docKey+"' into output connection '"+outputConnectionName+"'");
}
- return performIngestion(transformationConnectionNames,transformationVersions,
- outputConnectionName,outputVersion,
- docKey,documentVersion,parameterVersion,
+ return performIngestion(transformationConnections,transformationDescriptionStrings,
+ outputConnection,outputDescriptionString,
+ docKey,documentVersion,outputVersion,transformationVersion,parameterVersion,
authorityName,
data,
ingestTime,documentURI,
@@ -675,15 +707,17 @@ public class IncrementalIngester extends
/** Do the actual ingestion, or just record it if there's nothing to ingest. */
protected boolean performIngestion(
- String[] transformationConnectionNames, String[] transformationVersions,
- String outputConnectionName, String outputVersion,
- String docKey, String documentVersion, String parameterVersion,
+ ITransformationConnection[] transformationConnections, String[] transformationDescriptionStrings,
+ IOutputConnection outputConnection, String outputDescriptionString,
+ String docKey, String documentVersion, String outputVersion, String transformationVersion, String parameterVersion,
String authorityNameString,
RepositoryDocument data,
long ingestTime, String documentURI,
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
+ String outputConnectionName = outputConnection.getName();
+
// No transactions; not safe because post may take too much time
// First, calculate a document uri hash value
@@ -770,8 +804,7 @@ public class IncrementalIngester extends
new UnitaryClause(outputConnNameField,"=",outputConnectionName)});
list.add(docKey);
performDelete("WHERE "+query+" AND "+docKeyField+"!=?",list,null);
- IOutputConnection connection = connectionManager.load(outputConnectionName);
- removeDocument(connection,oldURI,oldOutputVersion,new OutputRemoveActivitiesWrapper(activities,outputConnectionName));
+ removeDocument(outputConnection,oldURI,oldOutputVersion,new OutputRemoveActivitiesWrapper(activities,outputConnectionName));
}
if (documentURI != null)
@@ -805,21 +838,18 @@ public class IncrementalIngester extends
// This is a marker that says "something is there"; it has an empty version, which indicates
// that we don't know anything about it. That means it will be reingested when the
// next version comes along, and will be deleted if called for also.
- // MHL -- needs to record info about transformations!!
- noteDocumentIngest(outputConnectionName,docKey,null,null,null,null,ingestTime,documentURI,documentURIHash);
- int result = addOrReplaceDocument(transformationConnectionNames,transformationVersions,
- outputConnectionName,outputVersion,
+ noteDocumentIngest(outputConnectionName,docKey,null,null,null,null,null,ingestTime,documentURI,documentURIHash);
+ int result = addOrReplaceDocument(transformationConnections,transformationDescriptionStrings,
+ outputConnection,outputDescriptionString,
documentURI,data,authorityNameString,
activities);
- // MHL -- needs to record info about transformations!!
- noteDocumentIngest(outputConnectionName,docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
+ noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion, outputVersion,parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
return result == IOutputConnector.DOCUMENTSTATUS_ACCEPTED;
}
// If we get here, it means we are noting that the document was examined, but that no change was required. This is signaled
// to noteDocumentIngest by having the null documentURI.
- // MHL -- needs to record info about transformations!!
- noteDocumentIngest(outputConnectionName,docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,null,null);
+ noteDocumentIngest(outputConnectionName,docKey,documentVersion,transformationVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,null,null);
return true;
}
finally
@@ -1670,6 +1700,7 @@ public class IncrementalIngester extends
*@param outputConnectionName is the name of the output connection.
*@param docKey is the key string describing the document.
*@param documentVersion is a string describing the new version of the document.
+ *@param transformationVersion is a string describing all current transformations for the document.
*@param outputVersion is the version string calculated for the output connection.
*@param authorityNameString is the name of the relevant authority connection.
*@param packedForcedParameters is the string we use to determine differences in packed parameters.
@@ -1679,7 +1710,7 @@ public class IncrementalIngester extends
*@param documentURIHash is the hash of the document uri.
*/
protected void noteDocumentIngest(String outputConnectionName,
- String docKey, String documentVersion,
+ String docKey, String documentVersion, String transformationVersion,
String outputVersion, String packedForcedParameters,
String authorityNameString,
long ingestTime, String documentURI, String documentURIHash)
@@ -1709,6 +1740,7 @@ public class IncrementalIngester extends
// Try the update first. Typically this succeeds except in the case where a doc is indexed for the first time.
map.clear();
map.put(lastVersionField,documentVersion);
+ map.put(lastTransformationVersionField,transformationVersion);
map.put(lastOutputVersionField,outputVersion);
map.put(forcedParamsField,packedForcedParameters);
map.put(lastIngestField,new Long(ingestTime));
@@ -1787,6 +1819,7 @@ public class IncrementalIngester extends
// Set up for insert
map.clear();
map.put(lastVersionField,documentVersion);
+ map.put(lastTransformationVersionField,transformationVersion);
map.put(lastOutputVersionField,outputVersion);
map.put(forcedParamsField,packedForcedParameters);
map.put(lastIngestField,new Long(ingestTime));
@@ -1918,16 +1951,19 @@ public class IncrementalIngester extends
String lastVersion = (String)row.getValue(lastVersionField);
if (lastVersion == null)
lastVersion = "";
+ String lastTransformationVersion = (String)row.getValue(lastTransformationVersionField);
+ if (lastTransformationVersion == null)
+ lastTransformationVersion = "";
String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
if (lastOutputVersion == null)
lastOutputVersion = "";
- String authorityName = (String)row.getValue(authorityNameField);
- if (authorityName == null)
- authorityName = "";
String paramVersion = (String)row.getValue(forcedParamsField);
if (paramVersion == null)
paramVersion = "";
- rval[position.intValue()] = new DocumentIngestStatus(lastVersion,new String[0],lastOutputVersion,authorityName,paramVersion,new String[0]);
+ String authorityName = (String)row.getValue(authorityNameField);
+ if (authorityName == null)
+ authorityName = "";
+ rval[position.intValue()] = new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName);
}
}
}
@@ -1937,8 +1973,8 @@ public class IncrementalIngester extends
/** Add or replace document, using the specified output connection, via the standard pool.
*/
protected int addOrReplaceDocument(
- String[] transformationConnectionNames, String[] transformationDescriptionStrings,
- String outputConnectionName, String outputDescriptionString,
+ ITransformationConnection[] transformationConnections, String[] transformationDescriptionStrings,
+ IOutputConnection outputConnection, String outputDescriptionString,
String documentURI, RepositoryDocument document, String authorityNameString,
IOutputAddActivity finalActivities)
throws ManifoldCFException, ServiceInterruption
@@ -1947,7 +1983,7 @@ public class IncrementalIngester extends
document.setIndexingDate(new Date());
// Set up a pipeline
- PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,
+ PipelineObject pipeline = pipelineGrab(transformationConnections,outputConnection,
transformationDescriptionStrings,outputDescriptionString);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java?rev=1601214&r1=1601213&r2=1601214&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java Sun Jun 8 11:44:19 2014
@@ -32,24 +32,21 @@ public class DocumentIngestStatus
public static final String _rcsid = "@(#)$Id: DocumentIngestStatus.java 988245 2010-08-23 18:39:35Z kwright $";
protected final String documentVersionString;
- protected final String[] transformationVersionStrings;
+ protected final String transformationVersionString;
protected final String outputVersionString;
- protected final String documentAuthorityNameString;
protected final String parameterVersionString;
- protected final String[] transformationNameStrings;
+ protected final String documentAuthorityNameString;
/** Constructor */
public DocumentIngestStatus(String documentVersionString,
- String[] transformationVersionStrings, String outputVersionString,
- String documentAuthorityNameString, String parameterVersionString,
- String[] transformationNameStrings)
+ String transformationVersionString, String outputVersionString, String parameterVersionString,
+ String documentAuthorityNameString)
{
this.documentVersionString = documentVersionString;
- this.transformationVersionStrings = transformationVersionStrings;
+ this.transformationVersionString = transformationVersionString;
this.outputVersionString = outputVersionString;
- this.documentAuthorityNameString = documentAuthorityNameString;
this.parameterVersionString = parameterVersionString;
- this.transformationNameStrings = transformationNameStrings;
+ this.documentAuthorityNameString = documentAuthorityNameString;
}
/** Get the document version */
@@ -58,16 +55,10 @@ public class DocumentIngestStatus
return documentVersionString;
}
- /** Get the transformation name strings */
- public String[] getTransformationNameStrings()
- {
- return transformationNameStrings;
- }
-
/** Get the transformation version strings */
- public String[] getTransformationVersions()
+ public String getTransformationVersion()
{
- return transformationVersionStrings;
+ return transformationVersionString;
}
/** Get the output version */
@@ -76,16 +67,16 @@ public class DocumentIngestStatus
return outputVersionString;
}
- /** Get the document authority name string */
- public String getDocumentAuthorityNameString()
- {
- return documentAuthorityNameString;
- }
-
/** Get the parameter version string */
public String getParameterVersion()
{
return parameterVersionString;
}
+ /** Get the document authority name string */
+ public String getDocumentAuthorityNameString()
+ {
+ return documentAuthorityNameString;
+ }
+
}
Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1601214&r1=1601213&r2=1601214&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java Sun Jun 8 11:44:19 2014
@@ -258,13 +258,15 @@ public interface IIncrementalIngester
* described by the RepositoryDocument object passed to this method.
* ServiceInterruption is thrown if the document ingestion must be rescheduled.
*@param transformationConnectionNames are the names of the transformation connections associated with this action.
+ *@param transformationDescriptionStrings are the description strings corresponding to the transformation connection names.
*@param outputConnectionName is the name of the output connection associated with this action.
+ *@param otuputDescriptionString is the description string corresponding to the output connection.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param documentVersion is the document version.
- *@param parameterVersion is the forced parameter version.
- *@param transformationVersions are the version strings for the transformations to be performed on the document.
- *@param outputVersion is the output version string constructed from the output specification by the output connector.
+ *@param transformationVersion is the version string for the transformations to be performed on the document.
+ *@param outputVersion is the output version string for the output connection.
+ *@param parameterVersion is the version string for the forced parameters.
*@param authorityName is the name of the authority associated with the document, if any.
*@param data is the document data. The data is closed after ingestion is complete.
*@param ingestTime is the time at which the ingestion took place, in milliseconds since epoch.
@@ -274,10 +276,12 @@ public interface IIncrementalIngester
*/
public boolean documentIngest(
String[] transformationConnectionNames,
+ String[] transformationDescriptionStrings,
String outputConnectionName,
+ String outputDescriptionString,
String identifierClass, String identifierHash,
String documentVersion,
- String[] transformationVersions,
+ String transformationVersion,
String outputVersion,
String parameterVersion,
String authorityName,
Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1601214&r1=1601213&r2=1601214&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Sun Jun 8 11:44:19 2014
@@ -319,7 +319,7 @@ public class JobManager implements IJobM
throws ManifoldCFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
- ArrayList list = new ArrayList();
+ List<String> list = new ArrayList<String>();
int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
int currentCount = 0;
int i = 0;
@@ -341,7 +341,7 @@ public class JobManager implements IJobM
/** Note deregistration for a batch of connection names.
*/
- protected void noteConnectionDeregistration(ArrayList list)
+ protected void noteConnectionDeregistration(List<String> list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
@@ -371,7 +371,7 @@ public class JobManager implements IJobM
throws ManifoldCFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
- ArrayList list = new ArrayList();
+ List<String> list = new ArrayList<String>();
int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
int currentCount = 0;
int i = 0;
@@ -393,7 +393,7 @@ public class JobManager implements IJobM
/** Note registration for a batch of connection names.
*/
- protected void noteConnectionRegistration(ArrayList list)
+ protected void noteConnectionRegistration(List<String> list)
throws ManifoldCFException
{
// Query for the matching jobs, and then for each job potentially adjust the state
@@ -413,17 +413,6 @@ public class JobManager implements IJobM
}
}
- /** Note a change in connection configuration.
- * This method will be called whenever a connection's configuration is modified, or when an external repository change
- * is signalled.
- */
- @Override
- public void noteConnectionChange(String connectionName)
- throws ManifoldCFException
- {
- jobs.noteConnectionChange(connectionName);
- }
-
/** Note the deregistration of an output connector used by the specified connections.
* This method will be called when the connector is deregistered. Jobs that use these connections
* must therefore enter appropriate states.
@@ -434,7 +423,7 @@ public class JobManager implements IJobM
throws ManifoldCFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
- ArrayList list = new ArrayList();
+ List<String> list = new ArrayList<String>();
int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
int currentCount = 0;
int i = 0;
@@ -456,7 +445,7 @@ public class JobManager implements IJobM
/** Note deregistration for a batch of output connection names.
*/
- protected void noteOutputConnectionDeregistration(ArrayList list)
+ protected void noteOutputConnectionDeregistration(List<String> list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
@@ -486,7 +475,7 @@ public class JobManager implements IJobM
throws ManifoldCFException
{
// For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
- ArrayList list = new ArrayList();
+ List<String> list = new ArrayList<String>();
int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
int currentCount = 0;
int i = 0;
@@ -508,7 +497,7 @@ public class JobManager implements IJobM
/** Note registration for a batch of output connection names.
*/
- protected void noteOutputConnectionRegistration(ArrayList list)
+ protected void noteOutputConnectionRegistration(List<String> list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
@@ -537,8 +526,46 @@ public class JobManager implements IJobM
public void noteTransformationConnectorDeregistration(String[] connectionNames)
throws ManifoldCFException
{
- // This is problematic; we need a different bit in the job state for every transformation in the job pipeline
- // MHL
+ // For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
+ List<String> list = new ArrayList<String>();
+ int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
+ int currentCount = 0;
+ int i = 0;
+ while (i < connectionNames.length)
+ {
+ if (currentCount == maxCount)
+ {
+ noteConnectionDeregistration(list);
+ list.clear();
+ currentCount = 0;
+ }
+
+ list.add(connectionNames[i++]);
+ currentCount++;
+ }
+ if (currentCount > 0)
+ noteTransformationConnectionDeregistration(list);
+ }
+
+ /** Note deregistration for a batch of transformation connection names.
+ */
+ protected void noteTransformationConnectionDeregistration(List<String> list)
+ throws ManifoldCFException
+ {
+ StringBuilder query = new StringBuilder();
+ ArrayList newList = new ArrayList();
+ // Query for the matching jobs, and then for each job potentially adjust the state
+ jobs.buildTransformationMatchingQuery(query,newList,list);
+ query.append(" FOR UPDATE");
+ IResultSet set = database.performQuery(query.toString(),newList,null,null);
+ int i = 0;
+ while (i < set.getRowCount())
+ {
+ IResultRow row = set.getRow(i++);
+ Long jobID = (Long)row.getValue(jobs.idField);
+ int statusValue = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+ jobs.noteTransformationConnectorDeregistration(jobID,statusValue);
+ }
}
/** Note the registration of a transformation connector used by the specified connections.
@@ -550,8 +577,57 @@ public class JobManager implements IJobM
public void noteTransformationConnectorRegistration(String[] connectionNames)
throws ManifoldCFException
{
- // This is problematic; we need a different bit in the job state for every transformation in the job pipeline
- // MHL
+ // For each connection, find the corresponding list of jobs. From these jobs, we want the job id and the status.
+ List<String> list = new ArrayList<String>();
+ int maxCount = database.findConjunctionClauseMax(new ClauseDescription[]{});
+ int currentCount = 0;
+ int i = 0;
+ while (i < connectionNames.length)
+ {
+ if (currentCount == maxCount)
+ {
+ noteConnectionDeregistration(list);
+ list.clear();
+ currentCount = 0;
+ }
+
+ list.add(connectionNames[i++]);
+ currentCount++;
+ }
+ if (currentCount > 0)
+ noteTransformationConnectionRegistration(list);
+ }
+
+ /** Note registration for a batch of transformation connection names.
+ */
+ protected void noteTransformationConnectionRegistration(List<String> list)
+ throws ManifoldCFException
+ {
+ StringBuilder query = new StringBuilder();
+ ArrayList newList = new ArrayList();
+ // Query for the matching jobs, and then for each job potentially adjust the state
+ jobs.buildTransformationMatchingQuery(query,newList,list);
+ query.append(" FOR UPDATE");
+ IResultSet set = database.performQuery(query.toString(),newList,null,null);
+ int i = 0;
+ while (i < set.getRowCount())
+ {
+ IResultRow row = set.getRow(i++);
+ Long jobID = (Long)row.getValue(jobs.idField);
+ int statusValue = jobs.stringToStatus((String)row.getValue(jobs.statusField));
+ jobs.noteTransformationConnectorRegistration(jobID,statusValue);
+ }
+ }
+
+ /** Note a change in connection configuration.
+ * This method will be called whenever a connection's configuration is modified, or when an external repository change
+ * is signalled.
+ */
+ @Override
+ public void noteConnectionChange(String connectionName)
+ throws ManifoldCFException
+ {
+ jobs.noteConnectionChange(connectionName);
}
/** Note a change in output connection configuration.
Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java?rev=1601214&r1=1601213&r2=1601214&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/Jobs.java Sun Jun 8 11:44:19 2014
@@ -57,6 +57,7 @@ import java.util.*;
* <tr><td>processid</td><td>VARCHAR(16)</td><td></td></tr>
* <tr><td>failtime</td><td>BIGINT</td><td></td></tr>
* <tr><td>failcount</td><td>BIGINT</td><td></td></tr>
+ * <tr><td>assessmentstate</td><td>CHAR(1)</td><td></td></tr>
* </table>
* <br><br>
*
@@ -124,12 +125,18 @@ public class Jobs extends org.apache.man
// connector exists before deciding what state to put the job into.
public static final int STATUS_ACTIVE_UNINSTALLED = 38; // Active, but repository connector not installed
public static final int STATUS_ACTIVESEEDING_UNINSTALLED = 39; // Active and seeding, but repository connector not installed
- public static final int STATUS_ACTIVE_NOOUTPUT = 40; // Active, but output connector not installed
- public static final int STATUS_ACTIVESEEDING_NOOUTPUT = 41; // Active and seeding, but output connector not installed
- public static final int STATUS_ACTIVE_NEITHER = 42; // Active, but neither repository connector nor output connector installed
- public static final int STATUS_ACTIVESEEDING_NEITHER = 43; // Active and seeding, but neither repository connector nor output connector installed
- public static final int STATUS_DELETING_NOOUTPUT = 44; // Job is being deleted but there's no output connector installed
+ public static final int STATUS_DELETING_NOOUTPUT = 40; // Job is being deleted but there's no output connector installed
+ // Deprecated states. These states should never be used; they're defined only for upgrade purposes
+ public static final int STATUS_ACTIVE_NOOUTPUT = 100; // Active, but output connector not installed
+ public static final int STATUS_ACTIVESEEDING_NOOUTPUT = 101; // Active and seeding, but output connector not installed
+ public static final int STATUS_ACTIVE_NEITHER = 102; // Active, but neither repository connector nor output connector installed
+ public static final int STATUS_ACTIVESEEDING_NEITHER = 103; // Active and seeding, but neither repository connector nor output connector installed
+
+ // Need Connector Assessment states
+ public static final int ASSESSMENT_KNOWN = 0; // State is known.
+ public static final int ASSESSMENT_UNKNOWN = 1; // State is unknown, and job needs assessment
+
// Type field values
public static final int TYPE_CONTINUOUS = IJobDescription.TYPE_CONTINUOUS;
public static final int TYPE_SPECIFIED = IJobDescription.TYPE_SPECIFIED;
@@ -192,14 +199,18 @@ public class Jobs extends org.apache.man
public static final String failTimeField = "failtime";
/** When non-null, indicates the number of retries remaining, after which the attempt will be considered to have actually failed */
public static final String failCountField = "failcount";
-
- protected static Map statusMap;
- protected static Map typeMap;
- protected static Map startMap;
- protected static Map hopmodeMap;
+ /** Set to N when the job needs connector-installed assessment */
+ public static final String assessmentStateField = "assessmentstate";
+
+ protected static Map<String,Integer> statusMap;
+ protected static Map<String,Integer> typeMap;
+ protected static Map<String,Integer> startMap;
+ protected static Map<String,Integer> hopmodeMap;
+ protected static Map<String,Integer> assessmentMap;
+
static
{
- statusMap = new HashMap();
+ statusMap = new HashMap<String,Integer>();
statusMap.put("N",new Integer(STATUS_INACTIVE));
statusMap.put("A",new Integer(STATUS_ACTIVE));
statusMap.put("P",new Integer(STATUS_PAUSED));
@@ -247,19 +258,23 @@ public class Jobs extends org.apache.man
statusMap.put("u",new Integer(STATUS_ACTIVESEEDING_NEITHER));
statusMap.put("D",new Integer(STATUS_DELETING_NOOUTPUT));
- typeMap = new HashMap();
+ typeMap = new HashMap<String,Integer>();
typeMap.put("C",new Integer(TYPE_CONTINUOUS));
typeMap.put("S",new Integer(TYPE_SPECIFIED));
- startMap = new HashMap();
+ startMap = new HashMap<String,Integer>();
startMap.put("B",new Integer(START_WINDOWBEGIN));
startMap.put("I",new Integer(START_WINDOWINSIDE));
startMap.put("D",new Integer(START_DISABLE));
- hopmodeMap = new HashMap();
+ hopmodeMap = new HashMap<String,Integer>();
hopmodeMap.put("A",new Integer(HOPCOUNT_ACCURATE));
hopmodeMap.put("N",new Integer(HOPCOUNT_NODELETE));
hopmodeMap.put("V",new Integer(HOPCOUNT_NEVERDELETE));
+
+ assessmentMap = new HashMap<String,Integer>();
+ assessmentMap.put("Y",new Integer(ASSESSMENT_KNOWN));
+ assessmentMap.put("N",new Integer(ASSESSMENT_UNKNOWN));
}
/* Transient vs. non-transient states
@@ -392,6 +407,7 @@ public class Jobs extends org.apache.man
map.put(processIDField,new ColumnDescription("VARCHAR(16)",false,true,null,null,false));
map.put(failTimeField,new ColumnDescription("BIGINT",false,true,null,null,false));
map.put(failCountField,new ColumnDescription("BIGINT",false,true,null,null,false));
+ map.put(assessmentStateField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
performCreate(map,null);
}
else
@@ -421,6 +437,12 @@ public class Jobs extends org.apache.man
insertMap.put(failCountField,new ColumnDescription("BIGINT",false,true,null,null,false));
performAlter(insertMap,null,null,null);
}
+ if (existing.get(assessmentStateField) == null)
+ {
+ Map insertMap = new HashMap();
+ insertMap.put(assessmentStateField,new ColumnDescription("CHAR(1)",false,true,null,null,false));
+ performAlter(insertMap,null,null,null);
+ }
}
// Handle related tables
@@ -524,6 +546,17 @@ public class Jobs extends org.apache.man
analyzeTable();
}
+ /** Build a query returning jobID and status for all jobs matching a set of transformation connection names.
+ */
+ public void buildTransformationMatchingQuery(StringBuilder query, ArrayList params,
+ List<String> transformationConnectionNames)
+ {
+ query.append("SELECT ").append(idField).append(",").append(statusField)
+ .append(" FROM ").append(getTableName()).append(" t1 WHERE(");
+ pipelineManager.buildQueryClause(query,params,"t1."+idField,transformationConnectionNames);
+ query.append(")");
+ }
+
/** Read schedule records for a specified set of jobs. Cannot use caching!
*/
public ScheduleRecord[][] readScheduleRecords(Long[] jobIDs)
@@ -1275,6 +1308,26 @@ public class Jobs extends org.apache.man
performUpdate(map,"WHERE "+query,list,null);
}
+ /** Signal to a job that an underlying transformation connector has gone away.
+ *@param jobID is the identifier of the job.
+ *@param oldStatusValue is the current status value for the job.
+ */
+ public void noteTransformationConnectorDeregistration(Long jobID, int oldStatusValue)
+ throws ManifoldCFException
+ {
+ // MHL
+ }
+
+ /** Signal to a job that an underlying transformation connector has been registered.
+ *@param jobID is the identifier of the job.
+ *@param oldStatusValue is the current status value for the job.
+ */
+ public void noteTransformationConnectorRegistration(Long jobID, int oldStatusValue)
+ throws ManifoldCFException
+ {
+ // MHL
+ }
+
/** Signal to a job that its underlying output connector has gone away.
*@param jobID is the identifier of the job.
*@param oldStatusValue is the current status value for the job.
@@ -3071,6 +3124,35 @@ public class Jobs extends org.apache.man
}
}
+ /** Go from string to assessment state.
+ */
+ public static int stringToAssessmentState(String value)
+ throws ManifoldCFException
+ {
+ if (value == null || value.length() == 0)
+ return ASSESSMENT_KNOWN;
+ Integer x = assessmentMap.get(value);
+ if (x == null)
+ throw new ManifoldCFException("Bad assessment value: '"+value+"'");
+ return x.intValue();
+ }
+
+ /** Go from assessment state to string.
+ */
+ public static String assessmentStateToString(int value)
+ throws ManifoldCFException
+ {
+ switch(value)
+ {
+ case ASSESSMENT_KNOWN:
+ return "Y";
+ case ASSESSMENT_UNKNOWN:
+ return "N";
+ default:
+ throw new ManifoldCFException("Unknown assessment state value "+Integer.toString(value));
+ }
+ }
+
/** Go from string to hopcount mode.
*/
public static int stringToHopcountMode(String value)
@@ -3078,7 +3160,7 @@ public class Jobs extends org.apache.man
{
if (value == null || value.length() == 0)
return HOPCOUNT_ACCURATE;
- Integer x = (Integer)hopmodeMap.get(value);
+ Integer x = hopmodeMap.get(value);
if (x == null)
throw new ManifoldCFException("Bad hopcount mode value: '"+value+"'");
return x.intValue();
Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/PipelineManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/PipelineManager.java?rev=1601214&r1=1601213&r2=1601214&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/PipelineManager.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/PipelineManager.java Sun Jun 8 11:44:19 2014
@@ -121,6 +121,18 @@ public class PipelineManager extends org
performDrop(null);
}
+ /** Build a query clause matching a set of connection names.
+ */
+ public void buildQueryClause(StringBuilder query, ArrayList params,
+ String parentIDField, List<String> connectionNames)
+ {
+ query.append("SELECT 'x' FROM ").append(getTableName()).append(" WHERE ");
+ ArrayList newList = new ArrayList();
+ query.append(buildConjunctionClause(newList,new ClauseDescription[]{
+ new JoinClause(parentIDField,ownerIDField),
+ new MultiClause(transformationNameField,connectionNames)}));
+ }
+
/** Fill in a set of pipelines corresponding to a set of owner id's.
*@param returnValues is a map keyed by ownerID, with value of JobDescription.
*@param ownerIDList is the list of owner id's.
Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1601214&r1=1601213&r2=1601214&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Sun Jun 8 11:44:19 2014
@@ -153,7 +153,7 @@ public class WorkerThread extends Thread
transformationNames[k] = job.getPipelineStageConnectionName(k);
transformationSpecifications[k] = job.getPipelineStageSpecification(k);
}
- String newParameterVersion = packParameters(job.getForcedMetadata());
+
DocumentSpecification spec = job.getSpecification();
OutputSpecification outputSpec = job.getOutputSpecification();
int jobType = job.getType();
@@ -316,12 +316,17 @@ public class WorkerThread extends Thread
}
// Get the output version string. Cannot be null.
- String outputVersion = ingester.getOutputDescription(outputName,outputSpec);
+ String outputDescriptionString = ingester.getOutputDescription(outputName,outputSpec);
// Get the transformation version strings. Cannot be null.
- String[] transformationVersions = ingester.getTransformationDescriptions(transformationNames,transformationSpecifications);
+ String[] transformationDescriptionStrings = ingester.getTransformationDescriptions(transformationNames,transformationSpecifications);
+ // New version strings
+ String newOutputVersion = outputDescriptionString;
+ String newParameterVersion = packParameters(job.getForcedMetadata());
+ String newTransformationVersion = packTransformations(transformationNames,transformationDescriptionStrings);
+
Set<String> abortSet = new HashSet<String>();
- VersionActivity versionActivity = new VersionActivity(job.getID(),processID,connectionName,outputName,transformationNames,connMgr,jobManager,ingester,abortSet,outputVersion,transformationVersions,ingestLogger);
+ VersionActivity versionActivity = new VersionActivity(job.getID(),processID,connectionName,outputName,transformationNames,connMgr,jobManager,ingester,abortSet,outputDescriptionString,transformationDescriptionStrings,ingestLogger);
String aclAuthority = connection.getACLAuthority();
if (aclAuthority == null)
@@ -455,9 +460,8 @@ public class WorkerThread extends Thread
String oldDocVersion = oldDocStatus.getDocumentVersion();
String oldAuthorityName = oldDocStatus.getDocumentAuthorityNameString();
String oldOutputVersion = oldDocStatus.getOutputVersion();
+ String oldTransformationVersion = oldDocStatus.getTransformationVersion();
String oldParameterVersion = oldDocStatus.getParameterVersion();
- String[] oldTransformationNames = oldDocStatus.getTransformationNameStrings();
- String[] oldTransformationVersions = oldDocStatus.getTransformationVersions();
// Start the comparison processing
if (newDocVersion.length() == 0)
@@ -467,9 +471,9 @@ public class WorkerThread extends Thread
}
else if (oldDocVersion.equals(newDocVersion) &&
oldAuthorityName.equals(aclAuthority) &&
- oldOutputVersion.equals(outputVersion) &&
- oldParameterVersion.equals(newParameterVersion) &&
- compareTransformations(oldTransformationNames,oldTransformationVersions,transformationNames,transformationVersions))
+ oldOutputVersion.equals(newOutputVersion) &&
+ oldTransformationVersion.equals(newTransformationVersion) &&
+ oldParameterVersion.equals(newParameterVersion))
{
// The old logic was as follows:
//
@@ -530,6 +534,7 @@ public class WorkerThread extends Thread
ProcessActivity activity = new ProcessActivity(job.getID(),processID,
threadContext,rt,jobManager,ingester,
connectionName,outputName,transformationNames,
+ outputDescriptionString,transformationDescriptionStrings,
currentTime,
job.getExpiration(),
job.getForcedMetadata(),
@@ -537,7 +542,7 @@ public class WorkerThread extends Thread
job.getMaxInterval(),
job.getHopcountMode(),
connection,connector,connMgr,legalLinkTypes,ingestLogger,abortSet,
- outputVersion,newParameterVersion,transformationVersions);
+ newOutputVersion,newTransformationVersion,newParameterVersion);
try
{
@@ -1194,6 +1199,25 @@ public class WorkerThread extends Thread
}
}
+ protected static String packTransformations(String[] transformationNames, String[] transformationDescriptionStrings)
+ {
+ StringBuilder sb = new StringBuilder();
+ packList(sb,transformationNames,'+');
+ packList(sb,transformationDescriptionStrings,'!');
+ return sb.toString();
+ }
+
+ /** Another stuffer for packing lists of variable length */
+ protected static void packList(StringBuilder output, String[] values, char delimiter)
+ {
+ pack(output,Integer.toString(values.length),delimiter);
+ int i = 0;
+ while (i < values.length)
+ {
+ pack(output,values[i++],delimiter);
+ }
+ }
+
protected static String packParameters(Map<String,Set<String>> forcedParameters)
{
StringBuilder sb = new StringBuilder();
@@ -1237,20 +1261,6 @@ public class WorkerThread extends Thread
sb.append(delim);
}
- protected static boolean compareTransformations(String[] oldTransformationNames, String[] oldTransformationVersions,
- String[] transformationNames, String[] transformationVersions)
- {
- if (oldTransformationNames.length != transformationNames.length)
- return false;
- for (int i = 0; i < oldTransformationNames.length; i++)
- {
- if (!oldTransformationNames[i].equals(transformationNames[i]) ||
- !oldTransformationVersions[i].equals(transformationVersions[i]))
- return false;
- }
- return true;
- }
-
/** The maximum number of adds that happen in a single transaction */
protected static final int MAX_ADDS_IN_TRANSACTION = 20;
@@ -1269,8 +1279,8 @@ public class WorkerThread extends Thread
protected final IJobManager jobManager;
protected final IIncrementalIngester ingester;
protected final Set<String> abortSet;
- protected final String outputVersion;
- protected final String[] transformationVersions;
+ protected final String outputDescriptionString;
+ protected final String[] transformationDescriptionStrings;
protected final CheckActivity checkActivity;
/** Constructor.
*/
@@ -1279,7 +1289,7 @@ public class WorkerThread extends Thread
String[] transformationConnectionNames,
IRepositoryConnectionManager connMgr,
IJobManager jobManager, IIncrementalIngester ingester, Set<String> abortSet,
- String outputVersion, String[] transformationVersions,
+ String outputDescriptionString, String[] transformationDescriptionStrings,
CheckActivity checkActivity)
{
this.jobID = jobID;
@@ -1291,8 +1301,8 @@ public class WorkerThread extends Thread
this.jobManager = jobManager;
this.ingester = ingester;
this.abortSet = abortSet;
- this.outputVersion = outputVersion;
- this.transformationVersions = transformationVersions;
+ this.outputDescriptionString = outputDescriptionString;
+ this.transformationDescriptionStrings = transformationDescriptionStrings;
this.checkActivity = checkActivity;
}
@@ -1305,8 +1315,8 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkMimeTypeIndexable(
- transformationConnectionNames,transformationVersions,
- outputConnectionName,outputVersion,mimeType,
+ transformationConnectionNames,transformationDescriptionStrings,
+ outputConnectionName,outputDescriptionString,mimeType,
checkActivity);
}
@@ -1319,8 +1329,8 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkDocumentIndexable(
- transformationConnectionNames,transformationVersions,
- outputConnectionName,outputVersion,localFile,
+ transformationConnectionNames,transformationDescriptionStrings,
+ outputConnectionName,outputDescriptionString,localFile,
checkActivity);
}
@@ -1333,8 +1343,8 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkLengthIndexable(
- transformationConnectionNames,transformationVersions,
- outputConnectionName,outputVersion,length,
+ transformationConnectionNames,transformationDescriptionStrings,
+ outputConnectionName,outputDescriptionString,length,
checkActivity);
}
@@ -1348,8 +1358,8 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkURLIndexable(
- transformationConnectionNames,transformationVersions,
- outputConnectionName,outputVersion,url,
+ transformationConnectionNames,transformationDescriptionStrings,
+ outputConnectionName,outputDescriptionString,url,
checkActivity);
}
@@ -1502,6 +1512,8 @@ public class WorkerThread extends Thread
protected final String connectionName;
protected final String outputName;
protected final String[] transformationConnectionNames;
+ protected final String outputDescriptionString;
+ protected final String[] transformationDescriptionStrings;
protected final long currentTime;
protected final Long expireInterval;
protected final Map<String,Set<String>> forcedMetadata;
@@ -1516,8 +1528,8 @@ public class WorkerThread extends Thread
protected final IReprioritizationTracker rt;
protected final Set<String> abortSet;
protected final String outputVersion;
+ protected final String transformationVersion;
protected final String parameterVersion;
- protected final String[] transformationVersions;
// We submit references in bulk, because that's way more efficient.
protected final Map<DocumentReference,DocumentReference> referenceList = new HashMap<DocumentReference,DocumentReference>();
@@ -1540,6 +1552,7 @@ public class WorkerThread extends Thread
IReprioritizationTracker rt, IJobManager jobManager,
IIncrementalIngester ingester,
String connectionName, String outputName, String[] transformationConnectionNames,
+ String outputDescriptionString, String[] transformationDescriptionStrings,
long currentTime,
Long expireInterval,
Map<String,Set<String>> forcedMetadata,
@@ -1549,7 +1562,7 @@ public class WorkerThread extends Thread
IRepositoryConnection connection, IRepositoryConnector connector,
IRepositoryConnectionManager connMgr, String[] legalLinkTypes, OutputActivity ingestLogger,
Set<String> abortSet,
- String outputVersion, String parameterVersion, String[] transformationVersions)
+ String outputVersion, String transformationVersion, String parameterVersion)
{
this.jobID = jobID;
this.processID = processID;
@@ -1559,15 +1572,15 @@ public class WorkerThread extends Thread
this.ingester = ingester;
this.connectionName = connectionName;
this.outputName = outputName;
+ this.outputDescriptionString = outputDescriptionString;
this.transformationConnectionNames = transformationConnectionNames;
+ this.transformationDescriptionStrings = transformationDescriptionStrings;
this.currentTime = currentTime;
this.expireInterval = expireInterval;
this.forcedMetadata = forcedMetadata;
this.recrawlInterval = recrawlInterval;
this.maxInterval = maxInterval;
this.hopcountMode = hopcountMode;
-
- //this.job = job;
this.connection = connection;
this.connector = connector;
this.connMgr = connMgr;
@@ -1576,7 +1589,7 @@ public class WorkerThread extends Thread
this.abortSet = abortSet;
this.outputVersion = outputVersion;
this.parameterVersion = parameterVersion;
- this.transformationVersions = transformationVersions;
+ this.transformationVersion = transformationVersion;
}
/** Clean up any dangling information, before abandoning this process activity object */
@@ -1833,9 +1846,11 @@ public class WorkerThread extends Thread
// First, we need to add into the metadata the stuff from the job description.
ingester.documentIngest(transformationConnectionNames,
+ transformationDescriptionStrings,
outputName,
+ outputDescriptionString,
connectionName,documentIdentifierHash,
- version,transformationVersions,outputVersion,parameterVersion,
+ version,transformationVersion,outputVersion,parameterVersion,
connection.getACLAuthority(),
data,currentTime,
documentURI,
@@ -2186,8 +2201,8 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkMimeTypeIndexable(
- transformationConnectionNames,transformationVersions,
- outputName,outputVersion,mimeType,
+ transformationConnectionNames,transformationDescriptionStrings,
+ outputName,outputDescriptionString,mimeType,
ingestLogger);
}
@@ -2200,8 +2215,8 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkDocumentIndexable(
- transformationConnectionNames,transformationVersions,
- outputName,outputVersion,localFile,
+ transformationConnectionNames,transformationDescriptionStrings,
+ outputName,outputDescriptionString,localFile,
ingestLogger);
}
@@ -2214,8 +2229,8 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkLengthIndexable(
- transformationConnectionNames,transformationVersions,
- outputName,outputVersion,length,
+ transformationConnectionNames,transformationDescriptionStrings,
+ outputName,outputDescriptionString,length,
ingestLogger);
}
@@ -2229,8 +2244,8 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkURLIndexable(
- transformationConnectionNames,transformationVersions,
- outputName,outputVersion,url,
+ transformationConnectionNames,transformationDescriptionStrings,
+ outputName,outputDescriptionString,url,
ingestLogger);
}