You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/06 14:35:34 UTC
svn commit: r1600869 - in /manifoldcf/branches/CONNECTORS-946/framework:
agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/
agents/src/main/java/org/apache/manifoldcf/agents/interfaces/
agents/src/main/java/org/apache/manifoldcf/agent...
Author: kwright
Date: Fri Jun 6 12:35:33 2014
New Revision: 1600869
URL: http://svn.apache.org/r1600869
Log:
Convert WorkerThread to the new APIs
Modified:
manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputConnectionManagerFactory.java
manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/TransformationConnectionManagerFactory.java
manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Fri Jun 6 12:35:33 2014
@@ -472,6 +472,8 @@ public class IncrementalIngester extends
long recordTime, IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
+ activities = new OutputActivitiesWrapper(activities,outputConnectionName);
+
IOutputConnection connection = connectionManager.load(outputConnectionName);
String docKey = makeKey(identifierClass,identifierHash);
@@ -602,6 +604,8 @@ public class IncrementalIngester extends
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
+ activities = new OutputActivitiesWrapper(activities,outputConnectionName);
+
// MHL
IOutputConnection connection = connectionManager.load(outputConnectionName);
@@ -897,6 +901,7 @@ public class IncrementalIngester extends
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
+
// Segregate request by connection names
HashMap keyMap = new HashMap();
int i = 0;
@@ -945,6 +950,8 @@ public class IncrementalIngester extends
IOutputRemoveActivity activities)
throws ManifoldCFException, ServiceInterruption
{
+ activities = new OutputRemoveActivitiesWrapper(activities,outputConnectionName);
+
IOutputConnection connection = connectionManager.load(outputConnectionName);
if (Logging.ingest.isDebugEnabled())
@@ -1848,9 +1855,17 @@ public class IncrementalIngester extends
{
Long id = (Long)row.getValue(idField);
String lastVersion = (String)row.getValue(lastVersionField);
+ if (lastVersion == null)
+ lastVersion = "";
String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
+ if (lastOutputVersion == null)
+ lastOutputVersion = "";
String authorityName = (String)row.getValue(authorityNameField);
+ if (authorityName == null)
+ authorityName = "";
String paramVersion = (String)row.getValue(forcedParamsField);
+ if (paramVersion == null)
+ paramVersion = "";
rval[position.intValue()] = new DocumentIngestStatus(lastVersion,new String[0],lastOutputVersion,authorityName,paramVersion,new String[0]);
}
}
@@ -1928,4 +1943,140 @@ public class IncrementalIngester extends
return outputVersion;
}
}
+
+ /** Wrapper class for add activity. This handles conversion of output connector activity logging to
+ * qualified activity names */
+ protected static class OutputRecordingActivity implements IOutputHistoryActivity
+ {
+ protected final IOutputHistoryActivity activityProvider;
+ protected final String outputConnectionName;
+
+ public OutputRecordingActivity(IOutputHistoryActivity activityProvider, String outputConnectionName)
+ {
+ this.activityProvider = activityProvider;
+ this.outputConnectionName = outputConnectionName;
+ }
+
+ /** Record time-stamped information about the activity of the output connector.
+ *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every
+ * activity has an associated time; the startTime field records when the activity began. A null value
+ * indicates that the start time and the finishing time are the same.
+ *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is
+ * used to categorize what kind of activity is being recorded. For example, a web connector might record a
+ * "fetch document" activity. Cannot be null.
+ *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable.
+ *@param entityURI is a (possibly long) string which identifies the object involved in the history record.
+ * The interpretation of this field will differ from connector to connector. May be null.
+ *@param resultCode contains a terse description of the result of the activity. The description is limited in
+ * size to 255 characters, and can be interpreted only in the context of the current connector. May be null.
+ *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
+ * described in the resultCode field. This field is not meant to be queried on. May be null.
+ */
+ @Override
+ public void recordActivity(Long startTime, String activityType, Long dataSize,
+ String entityURI, String resultCode, String resultDescription)
+ throws ManifoldCFException
+ {
+ activityProvider.recordActivity(startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),
+ dataSize,entityURI,resultCode,resultDescription);
+ }
+
+ }
+
+ protected static class OutputRemoveActivitiesWrapper extends OutputRecordingActivity implements IOutputRemoveActivity
+ {
+ protected final IOutputRemoveActivity activities;
+
+ public OutputRemoveActivitiesWrapper(IOutputRemoveActivity activities, String outputConnectionName)
+ {
+ super(activities,outputConnectionName);
+ this.activities = activities;
+ }
+
+ }
+
+ protected static class OutputActivitiesWrapper extends OutputRecordingActivity implements IOutputActivity
+ {
+ protected final IOutputActivity activities;
+
+ public OutputActivitiesWrapper(IOutputActivity activities, String outputConnectionName)
+ {
+ super(activities,outputConnectionName);
+ this.activities = activities;
+ }
+
+ /** Qualify an access token appropriately, to match access tokens as returned by mod_aa. This method
+ * includes the authority name with the access token, if any, so that each authority may establish its own token space.
+ *@param authorityNameString is the name of the authority to use to qualify the access token.
+ *@param accessToken is the raw, repository access token.
+ *@return the properly qualified access token.
+ */
+ @Override
+ public String qualifyAccessToken(String authorityNameString, String accessToken)
+ throws ManifoldCFException
+ {
+ return activities.qualifyAccessToken(authorityNameString,accessToken);
+ }
+
+ /** Send a document via the pipeline to the next output connection.
+ *@param document is the document data to be processed (handed to the output data store).
+ *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
+ */
+ public int sendDocument(RepositoryDocument document)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return activities.sendDocument(document);
+ }
+
+ /** Detect if a mime type is acceptable downstream or not. This method is used to determine whether it makes sense to fetch a document
+ * in the first place.
+ *@param mimeType is the mime type of the document.
+ *@return true if the mime type can be accepted by the downstream connection.
+ */
+ @Override
+ public boolean checkMimeTypeIndexable(String mimeType)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return activities.checkMimeTypeIndexable(mimeType);
+ }
+
+ /** Pre-determine whether a document (passed here as a File object) is acceptable downstream. This method is
+ * used to determine whether a document needs to be actually transferred. This hook is provided mainly to support
+ * search engines that only handle a small set of accepted file types.
+ *@param localFile is the local file to check.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkDocumentIndexable(File localFile)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return activities.checkDocumentIndexable(localFile);
+ }
+
+ /** Pre-determine whether a document's length is acceptable downstream. This method is used
+ * to determine whether to fetch a document in the first place.
+ *@param length is the length of the document.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkLengthIndexable(long length)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return activities.checkLengthIndexable(length);
+ }
+
+ /** Pre-determine whether a document's URL is acceptable downstream. This method is used
+ * to help filter out documents that cannot be indexed in advance.
+ *@param url is the URL of the document.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkURLIndexable(String url)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return activities.checkURLIndexable(url);
+ }
+
+ }
+
}
Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputConnectionManagerFactory.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputConnectionManagerFactory.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputConnectionManagerFactory.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputConnectionManagerFactory.java Fri Jun 6 12:35:33 2014
@@ -66,26 +66,23 @@ public class OutputConnectionManagerFact
{
IOutputConnectionManager manager = make(tc);
IOutputConnection[] connections = manager.getAllConnections();
- HashMap map = new HashMap();
- int i = 0;
- while (i < connections.length)
+ Set<String> map = new HashSet();
+ for (IOutputConnection connection : connections)
{
- IOutputConnection connection = connections[i++];
String connectionName = connection.getName();
String[] activities = OutputConnectorFactory.getActivitiesList(tc,connection.getClassName());
- int j = 0;
- while (j < activities.length)
+ for (String activityName : activities)
{
- String activity = activities[j++] + " ("+connectionName+")";
- map.put(activity,activity);
+ String activity = ManifoldCF.qualifyOutputActivityName(activityName,connectionName);
+ map.add(activity);
}
}
String[] rval = new String[map.size()];
- i = 0;
- Iterator iter = map.keySet().iterator();
+ int i = 0;
+ Iterator<String> iter = map.iterator();
while (iter.hasNext())
{
- rval[i++] = (String)iter.next();
+ rval[i++] = iter.next();
}
java.util.Arrays.sort(rval);
return rval;
Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/TransformationConnectionManagerFactory.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/TransformationConnectionManagerFactory.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/TransformationConnectionManagerFactory.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/TransformationConnectionManagerFactory.java Fri Jun 6 12:35:33 2014
@@ -73,7 +73,7 @@ public class TransformationConnectionMan
String[] activities = TransformationConnectorFactory.getActivitiesList(tc,connection.getClassName());
for (String baseActivity : activities)
{
- String activity = baseActivity + " ("+connectionName+")";
+ String activity = ManifoldCF.qualifyTransformationActivityName(baseActivity, connectionName);
map.add(activity);
}
}
Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/system/ManifoldCF.java Fri Jun 6 12:35:33 2014
@@ -171,6 +171,26 @@ public class ManifoldCF extends org.apac
AgentManagerFactory.noteOutputConnectionChange(threadContext,connectionName);
}
+ /** Qualify output activity name.
+ *@param outputActivityName is the name of the output activity.
+ *@param outputConnectionName is the corresponding name of the output connection.
+ *@return the qualified (global) activity name.
+ */
+ public static String qualifyOutputActivityName(String outputActivityName, String outputConnectionName)
+ {
+ return outputActivityName+" ("+outputConnectionName+")";
+ }
+
+ /** Qualify transformation activity name.
+ *@param transformationActivityName is the name of the output activity.
+ *@param transformationConnectionName is the corresponding name of the transformation connection.
+ *@return the qualified (global) activity name.
+ */
+ public static String qualifyTransformationActivityName(String transformationActivityName, String transformationConnectionName)
+ {
+ return transformationActivityName+" ["+transformationConnectionName+"]";
+ }
+
// Helper methods for API support. These are made public so connectors can use them to implement the executeCommand method.
// These are the universal node types.
Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ManifoldCF.java Fri Jun 6 12:35:33 2014
@@ -1123,16 +1123,6 @@ public class ManifoldCF extends org.apac
}
- /** Qualify output activity name.
- *@param outputActivityName is the name of the output activity.
- *@param outputConnectionName is the corresponding name of the output connection.
- *@return the qualified (global) activity name.
- */
- public static String qualifyOutputActivityName(String outputActivityName, String outputConnectionName)
- {
- return outputActivityName+" ("+outputConnectionName+")";
- }
-
/** Get the activities list for a given repository connection.
*/
public static String[] getActivitiesList(IThreadContext threadContext, String connectionName)
@@ -1143,31 +1133,35 @@ public class ManifoldCF extends org.apac
if (thisConnection == null)
return null;
String[] outputActivityList = OutputConnectionManagerFactory.getAllOutputActivities(threadContext);
+ String[] transformationActivityList = TransformationConnectionManagerFactory.getAllTransformationActivities(threadContext);
String[] connectorActivityList = RepositoryConnectorFactory.getActivitiesList(threadContext,thisConnection.getClassName());
String[] globalActivityList = IRepositoryConnectionManager.activitySet;
- String[] activityList = new String[outputActivityList.length + ((connectorActivityList==null)?0:connectorActivityList.length) + globalActivityList.length];
+ String[] activityList = new String[transformationActivityList.length + outputActivityList.length + ((connectorActivityList==null)?0:connectorActivityList.length) + globalActivityList.length];
int k2 = 0;
- int j;
+ if (transformationActivityList != null)
+ {
+ for (String transformationActivity: transformationActivityList)
+ {
+ activityList[k2++] = transformationActivity;
+ }
+ }
if (outputActivityList != null)
{
- j = 0;
- while (j < outputActivityList.length)
+ for (String outputActivity : outputActivityList)
{
- activityList[k2++] = outputActivityList[j++];
+ activityList[k2++] = outputActivity;
}
}
if (connectorActivityList != null)
{
- j = 0;
- while (j < connectorActivityList.length)
+ for (String connectorActivity : connectorActivityList)
{
- activityList[k2++] = connectorActivityList[j++];
+ activityList[k2++] = connectorActivity;
}
}
- j = 0;
- while (j < globalActivityList.length)
+ for (String globalActivity : globalActivityList)
{
- activityList[k2++] = globalActivityList[j++];
+ activityList[k2++] = globalActivity;
}
java.util.Arrays.sort(activityList);
return activityList;
Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1600869&r1=1600868&r2=1600869&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Fri Jun 6 12:35:33 2014
@@ -160,7 +160,7 @@ public class WorkerThread extends Thread
IRepositoryConnection connection = qds.getConnection();
- OutputActivity ingestLogger = new OutputActivity(connectionName,connMgr,outputName);
+ OutputActivity ingestLogger = new OutputActivity(connectionName,connMgr);
// The flow through this section of the code is as follows.
// (1) We start with a list of documents
@@ -315,16 +315,18 @@ public class WorkerThread extends Thread
}
}
- // Get the output version string.
+ // Get the output version string. Cannot be null.
String outputVersion = ingester.getOutputDescription(outputName,outputSpec);
- // Get the transformation version strings.
+ // Get the transformation version strings. Cannot be null.
String[] transformationVersions = ingester.getTransformationDescriptions(transformationNames,transformationSpecifications);
Set<String> abortSet = new HashSet<String>();
- VersionActivity versionActivity = new VersionActivity(job.getID(),processID,connectionName,outputName,transformationNames,connMgr,jobManager,ingester,abortSet,outputVersion,transformationVersions);
+ VersionActivity versionActivity = new VersionActivity(job.getID(),processID,connectionName,outputName,transformationNames,connMgr,jobManager,ingester,abortSet,outputVersion,transformationVersions,ingestLogger);
String aclAuthority = connection.getACLAuthority();
- boolean isDefaultAuthority = (aclAuthority == null || aclAuthority.length() == 0);
+ if (aclAuthority == null)
+ aclAuthority = "";
+ boolean isDefaultAuthority = (aclAuthority.length() == 0);
if (Logging.threads.isDebugEnabled())
Logging.threads.debug("Worker thread getting versions for "+Integer.toString(currentDocIDArray.length)+" documents");
@@ -400,10 +402,6 @@ public class WorkerThread extends Thread
// This try{ } is for releasing document versions at the connector level.
try
{
- // Organize what we need for document status comparison, and get it into a canonical form.
- String newOutputVersion = outputVersion;
- if (newOutputVersion == null)
- newOutputVersion = "";
// Loop through documents now, and amass what we need to fetch.
// We also need to tally: (1) what needs to be marked as deleted via
@@ -429,9 +427,6 @@ public class WorkerThread extends Thread
DocumentIngestStatus oldDocStatus = qd.getLastIngestedStatus();
String documentIDHash = dd.getDocumentIdentifierHash();
String newDocVersion = newVersionStringArray[i];
- String newAuthorityName = aclAuthority;
- if (newAuthorityName == null)
- newAuthorityName = "";
versionMap.put(dd.getDocumentIdentifierHash(),newDocVersion);
@@ -458,17 +453,11 @@ public class WorkerThread extends Thread
// that was there before is there now (which may mean a rescan),
// or (2) there are different versions (which ALWAYS means a rescan).
String oldDocVersion = oldDocStatus.getDocumentVersion();
- if (oldDocVersion == null)
- oldDocVersion = "";
String oldAuthorityName = oldDocStatus.getDocumentAuthorityNameString();
- if (oldAuthorityName == null)
- oldAuthorityName = "";
String oldOutputVersion = oldDocStatus.getOutputVersion();
- if (oldOutputVersion == null)
- oldOutputVersion = "";
String oldParameterVersion = oldDocStatus.getParameterVersion();
- if (oldParameterVersion == null)
- oldParameterVersion = "";
+ String[] oldTransformationNames = oldDocStatus.getTransformationNameStrings();
+ String[] oldTransformationVersions = oldDocStatus.getTransformationVersions();
// Start the comparison processing
if (newDocVersion.length() == 0)
@@ -477,9 +466,10 @@ public class WorkerThread extends Thread
allowIngest = true;
}
else if (oldDocVersion.equals(newDocVersion) &&
- oldAuthorityName.equals(newAuthorityName) &&
- oldOutputVersion.equals(newOutputVersion) &&
- oldParameterVersion.equals(newParameterVersion))
+ oldAuthorityName.equals(aclAuthority) &&
+ oldOutputVersion.equals(outputVersion) &&
+ oldParameterVersion.equals(newParameterVersion) &&
+ compareTransformations(oldTransformationNames,oldTransformationVersions,transformationNames,transformationVersions))
{
// The old logic was as follows:
//
@@ -1247,6 +1237,20 @@ public class WorkerThread extends Thread
sb.append(delim);
}
+ protected static boolean compareTransformations(String[] oldTransformationNames, String[] oldTransformationVersions,
+ String[] transformationNames, String[] transformationVersions)
+ {
+ if (oldTransformationNames.length != transformationNames.length)
+ return false;
+ for (int i = 0; i < oldTransformationNames.length; i++)
+ {
+ if (!oldTransformationNames[i].equals(transformationNames[i]) ||
+ !oldTransformationVersions[i].equals(transformationVersions[i]))
+ return false;
+ }
+ return true;
+ }
+
/** The maximum number of adds that happen in a single transaction */
protected static final int MAX_ADDS_IN_TRANSACTION = 20;
@@ -1267,7 +1271,7 @@ public class WorkerThread extends Thread
protected final Set<String> abortSet;
protected final String outputVersion;
protected final String[] transformationVersions;
-
+ protected final CheckActivity checkActivity;
/** Constructor.
*/
public VersionActivity(Long jobID, String processID,
@@ -1275,7 +1279,8 @@ public class WorkerThread extends Thread
String[] transformationConnectionNames,
IRepositoryConnectionManager connMgr,
IJobManager jobManager, IIncrementalIngester ingester, Set<String> abortSet,
- String outputVersion, String[] transformationVersions)
+ String outputVersion, String[] transformationVersions,
+ CheckActivity checkActivity)
{
this.jobID = jobID;
this.processID = processID;
@@ -1288,6 +1293,7 @@ public class WorkerThread extends Thread
this.abortSet = abortSet;
this.outputVersion = outputVersion;
this.transformationVersions = transformationVersions;
+ this.checkActivity = checkActivity;
}
/** Check whether a mime type is indexable by the currently specified output connector.
@@ -1298,8 +1304,10 @@ public class WorkerThread extends Thread
public boolean checkMimeTypeIndexable(String mimeType)
throws ManifoldCFException, ServiceInterruption
{
- // MHL
- return ingester.checkMimeTypeIndexable(outputConnectionName,outputVersion,mimeType);
+ return ingester.checkMimeTypeIndexable(
+ transformationConnectionNames,transformationVersions,
+ outputConnectionName,outputVersion,mimeType,
+ checkActivity);
}
/** Check whether a document is indexable by the currently specified output connector.
@@ -1310,8 +1318,10 @@ public class WorkerThread extends Thread
public boolean checkDocumentIndexable(File localFile)
throws ManifoldCFException, ServiceInterruption
{
- // MHL
- return ingester.checkDocumentIndexable(outputConnectionName,outputVersion,localFile);
+ return ingester.checkDocumentIndexable(
+ transformationConnectionNames,transformationVersions,
+ outputConnectionName,outputVersion,localFile,
+ checkActivity);
}
/** Check whether a document of a specified length is indexable by the currently specified output connector.
@@ -1322,8 +1332,10 @@ public class WorkerThread extends Thread
public boolean checkLengthIndexable(long length)
throws ManifoldCFException, ServiceInterruption
{
- // MHL
- return ingester.checkLengthIndexable(outputConnectionName,outputVersion,length);
+ return ingester.checkLengthIndexable(
+ transformationConnectionNames,transformationVersions,
+ outputConnectionName,outputVersion,length,
+ checkActivity);
}
/** Pre-determine whether a document's URL is indexable by this connector. This method is used by participating repository connectors
@@ -1335,8 +1347,10 @@ public class WorkerThread extends Thread
public boolean checkURLIndexable(String url)
throws ManifoldCFException, ServiceInterruption
{
- // MHL
- return ingester.checkURLIndexable(outputConnectionName,outputVersion,url);
+ return ingester.checkURLIndexable(
+ transformationConnectionNames,transformationVersions,
+ outputConnectionName,outputVersion,url,
+ checkActivity);
}
/** Record time-stamped information about the activity of the connector.
@@ -1487,7 +1501,7 @@ public class WorkerThread extends Thread
protected final IIncrementalIngester ingester;
protected final String connectionName;
protected final String outputName;
- protected final String[] transformationNames;
+ protected final String[] transformationConnectionNames;
protected final long currentTime;
protected final Long expireInterval;
protected final Map<String,Set<String>> forcedMetadata;
@@ -1525,7 +1539,7 @@ public class WorkerThread extends Thread
IThreadContext threadContext,
IReprioritizationTracker rt, IJobManager jobManager,
IIncrementalIngester ingester,
- String connectionName, String outputName, String[] transformationNames,
+ String connectionName, String outputName, String[] transformationConnectionNames,
long currentTime,
Long expireInterval,
Map<String,Set<String>> forcedMetadata,
@@ -1545,7 +1559,7 @@ public class WorkerThread extends Thread
this.ingester = ingester;
this.connectionName = connectionName;
this.outputName = outputName;
- this.transformationNames = transformationNames;
+ this.transformationConnectionNames = transformationConnectionNames;
this.currentTime = currentTime;
this.expireInterval = expireInterval;
this.forcedMetadata = forcedMetadata;
@@ -1818,7 +1832,7 @@ public class WorkerThread extends Thread
}
// First, we need to add into the metadata the stuff from the job description.
- ingester.documentIngest(transformationNames,
+ ingester.documentIngest(transformationConnectionNames,
outputName,
connectionName,documentIdentifierHash,
version,transformationVersions,outputVersion,parameterVersion,
@@ -2171,8 +2185,10 @@ public class WorkerThread extends Thread
public boolean checkMimeTypeIndexable(String mimeType)
throws ManifoldCFException, ServiceInterruption
{
- // MHL
- return ingester.checkMimeTypeIndexable(outputName,outputVersion,mimeType);
+ return ingester.checkMimeTypeIndexable(
+ transformationConnectionNames,transformationVersions,
+ outputName,outputVersion,mimeType,
+ ingestLogger);
}
/** Check whether a document is indexable by the currently specified output connector.
@@ -2183,8 +2199,10 @@ public class WorkerThread extends Thread
public boolean checkDocumentIndexable(File localFile)
throws ManifoldCFException, ServiceInterruption
{
- // MHL
- return ingester.checkDocumentIndexable(outputName,outputVersion,localFile);
+ return ingester.checkDocumentIndexable(
+ transformationConnectionNames,transformationVersions,
+ outputName,outputVersion,localFile,
+ ingestLogger);
}
/** Check whether a document of a specified length is indexable by the currently specified output connector.
@@ -2195,8 +2213,10 @@ public class WorkerThread extends Thread
public boolean checkLengthIndexable(long length)
throws ManifoldCFException, ServiceInterruption
{
- // MHL
- return ingester.checkLengthIndexable(outputName,outputVersion,length);
+ return ingester.checkLengthIndexable(
+ transformationConnectionNames,transformationVersions,
+ outputName,outputVersion,length,
+ ingestLogger);
}
/** Pre-determine whether a document's URL is indexable by this connector. This method is used by participating repository connectors
@@ -2208,7 +2228,10 @@ public class WorkerThread extends Thread
public boolean checkURLIndexable(String url)
throws ManifoldCFException, ServiceInterruption
{
- return ingester.checkURLIndexable(outputName,outputVersion,url);
+ return ingester.checkURLIndexable(
+ transformationConnectionNames,transformationVersions,
+ outputName,outputVersion,url,
+ ingestLogger);
}
/** Create a global string from a simple string.
@@ -2506,23 +2529,78 @@ public class WorkerThread extends Thread
}
}
+ /** The check activity class */
+ protected static class CheckActivity implements IOutputCheckActivity
+ {
+ public CheckActivity()
+ {
+ }
+
+ /** Detect if a mime type is acceptable downstream or not. This method is used to determine whether it makes sense to fetch a document
+ * in the first place.
+ *@param mimeType is the mime type of the document.
+ *@return true if the mime type can be accepted by the downstream connection.
+ */
+ @Override
+ public boolean checkMimeTypeIndexable(String mimeType)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return false;
+ }
+
+ /** Pre-determine whether a document (passed here as a File object) is acceptable downstream. This method is
+ * used to determine whether a document needs to be actually transferred. This hook is provided mainly to support
+ * search engines that only handle a small set of accepted file types.
+ *@param localFile is the local file to check.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkDocumentIndexable(File localFile)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return false;
+ }
+
+ /** Pre-determine whether a document's length is acceptable downstream. This method is used
+ * to determine whether to fetch a document in the first place.
+ *@param length is the length of the document.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkLengthIndexable(long length)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return false;
+ }
+
+ /** Pre-determine whether a document's URL is acceptable downstream. This method is used
+ * to help filter out documents that cannot be indexed in advance.
+ *@param url is the URL of the document.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkURLIndexable(String url)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return false;
+ }
+
+ }
+
/** The ingest logger class */
- protected static class OutputActivity implements IOutputActivity
+ protected static class OutputActivity extends CheckActivity implements IOutputActivity
{
// Connection name
- protected String connectionName;
+ protected final String connectionName;
// Connection manager
- protected IRepositoryConnectionManager connMgr;
- // Output connection name
- protected String outputConnectionName;
+ protected final IRepositoryConnectionManager connMgr;
/** Constructor */
- public OutputActivity(String connectionName, IRepositoryConnectionManager connMgr, String outputConnectionName)
+ public OutputActivity(String connectionName, IRepositoryConnectionManager connMgr)
{
this.connectionName = connectionName;
this.connMgr = connMgr;
- this.outputConnectionName = outputConnectionName;
}
/** Record time-stamped information about the activity of the output connector.
@@ -2540,11 +2618,12 @@ public class WorkerThread extends Thread
*@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
* described in the resultCode field. This field is not meant to be queried on. May be null.
*/
+ @Override
public void recordActivity(Long startTime, String activityType, Long dataSize,
String entityURI, String resultCode, String resultDescription)
throws ManifoldCFException
{
- connMgr.recordHistory(connectionName,startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),dataSize,entityURI,resultCode,
+ connMgr.recordHistory(connectionName,startTime,activityType,dataSize,entityURI,resultCode,
resultDescription,null);
}
@@ -2554,6 +2633,7 @@ public class WorkerThread extends Thread
*@param accessToken is the raw, repository access token.
*@return the properly qualified access token.
*/
+ @Override
public String qualifyAccessToken(String authorityNameString, String accessToken)
throws ManifoldCFException
{
@@ -2567,6 +2647,7 @@ public class WorkerThread extends Thread
*@param document is the document data to be processed (handed to the output data store).
*@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
*/
+ @Override
public int sendDocument(RepositoryDocument document)
throws ManifoldCFException, ServiceInterruption
{
@@ -2574,51 +2655,6 @@ public class WorkerThread extends Thread
return IPipelineConnector.DOCUMENTSTATUS_REJECTED;
}
- /** Detect if a mime type is acceptable downstream or not. This method is used to determine whether it makes sense to fetch a document
- * in the first place.
- *@param mimeType is the mime type of the document.
- *@return true if the mime type can be accepted by the downstream connection.
- */
- public boolean checkMimeTypeIndexable(String mimeType)
- throws ManifoldCFException, ServiceInterruption
- {
- return false;
- }
-
- /** Pre-determine whether a document (passed here as a File object) is acceptable downstream. This method is
- * used to determine whether a document needs to be actually transferred. This hook is provided mainly to support
- * search engines that only handle a small set of accepted file types.
- *@param localFile is the local file to check.
- *@return true if the file is acceptable by the downstream connection.
- */
- public boolean checkDocumentIndexable(File localFile)
- throws ManifoldCFException, ServiceInterruption
- {
- return false;
- }
-
- /** Pre-determine whether a document's length is acceptable downstream. This method is used
- * to determine whether to fetch a document in the first place.
- *@param length is the length of the document.
- *@return true if the file is acceptable by the downstream connection.
- */
- public boolean checkLengthIndexable(long length)
- throws ManifoldCFException, ServiceInterruption
- {
- return false;
- }
-
- /** Pre-determine whether a document's URL is acceptable downstream. This method is used
- * to help filter out documents that cannot be indexed in advance.
- *@param url is the URL of the document.
- *@return true if the file is acceptable by the downstream connection.
- */
- public boolean checkURLIndexable(String url)
- throws ManifoldCFException, ServiceInterruption
- {
- return false;
- }
-
}
}