You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/07 13:56:07 UTC
svn commit: r1601092 - in /manifoldcf/branches/CONNECTORS-946:
connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/
framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalinges...
Author: kwright
Date: Sat Jun 7 11:56:06 2014
New Revision: 1601092
URL: http://svn.apache.org/r1601092
Log:
Add pipeline code
Added:
manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java (with props)
Modified:
manifoldcf/branches/CONNECTORS-946/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Modified: manifoldcf/branches/CONNECTORS-946/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java?rev=1601092&r1=1601091&r2=1601092&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java (original)
+++ manifoldcf/branches/CONNECTORS-946/connectors/nulltransformation/connector/src/main/java/org/apache/manifoldcf/agents/transformation/nullconnector/NullConnector.java Sat Jun 7 11:56:06 2014
@@ -71,7 +71,7 @@ public class NullConnector extends org.a
try
{
long binaryLength = document.getBinaryLength();
- int rval = activities.sendDocument(document);
+ int rval = activities.sendDocument(documentURI,document,authorityNameString);
length = new Long(binaryLength);
resultCode = (rval == DOCUMENTSTATUS_ACCEPTED)?"ACCEPTED":"REJECTED";
return rval;
Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1601092&r1=1601091&r2=1601092&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Sat Jun 7 11:56:06 2014
@@ -235,22 +235,22 @@ public class IncrementalIngester extends
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- // MHL to handle the pipeline
- IOutputConnection connection = connectionManager.load(outputConnectionName);
- IOutputConnector connector = outputConnectorPool.grab(connection);
- if (connector == null)
- // The connector is not installed; treat this as a service interruption.
- throw new ServiceInterruption("Output connector not installed",0L);
+ PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+ if (pipeline == null)
+ // A connector is not installed; treat this as a service interruption.
+ throw new ServiceInterruption("One or more connectors are not installed",0L);
try
{
- return connector.checkMimeTypeIndexable(outputDescription,mimeType,activity);
+ return pipeline.checkMimeTypeIndexable(mimeType,activity);
}
finally
{
- outputConnectorPool.release(connection,connector);
+ pipeline.release();
}
}
+
+
/** Check if a file is indexable.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param outputDescription is the output description string.
@@ -283,19 +283,17 @@ public class IncrementalIngester extends
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- // MHL
- IOutputConnection connection = connectionManager.load(outputConnectionName);
- IOutputConnector connector = outputConnectorPool.grab(connection);
- if (connector == null)
- // The connector is not installed; treat this as a service interruption.
- throw new ServiceInterruption("Output connector not installed",0L);
+ PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+ if (pipeline == null)
+ // A connector is not installed; treat this as a service interruption.
+ throw new ServiceInterruption("One or more connectors are not installed",0L);
try
{
- return connector.checkDocumentIndexable(outputDescription,localFile,activity);
+ return pipeline.checkDocumentIndexable(localFile,activity);
}
finally
{
- outputConnectorPool.release(connection,connector);
+ pipeline.release();
}
}
@@ -333,19 +331,17 @@ public class IncrementalIngester extends
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- // MHL
- IOutputConnection connection = connectionManager.load(outputConnectionName);
- IOutputConnector connector = outputConnectorPool.grab(connection);
- if (connector == null)
- // The connector is not installed; treat this as a service interruption.
- throw new ServiceInterruption("Output connector not installed",0L);
+ PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+ if (pipeline == null)
+ // A connector is not installed; treat this as a service interruption.
+ throw new ServiceInterruption("One or more connectors are not installed",0L);
try
{
- return connector.checkLengthIndexable(outputDescription,length,activity);
+ return pipeline.checkLengthIndexable(length,activity);
}
finally
{
- outputConnectorPool.release(connection,connector);
+ pipeline.release();
}
}
@@ -383,19 +379,74 @@ public class IncrementalIngester extends
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- // MHL
- IOutputConnection connection = connectionManager.load(outputConnectionName);
- IOutputConnector connector = outputConnectorPool.grab(connection);
- if (connector == null)
- // The connector is not installed; treat this as a service interruption.
- throw new ServiceInterruption("Output connector not installed",0L);
+ PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,transformationDescriptions,outputDescription);
+ if (pipeline == null)
+ // A connector is not installed; treat this as a service interruption.
+ throw new ServiceInterruption("One or more connectors are not installed",0L);
try
{
- return connector.checkURLIndexable(outputDescription,url,activity);
+ return pipeline.checkURLIndexable(url,activity);
}
finally
{
- outputConnectorPool.release(connection,connector);
+ pipeline.release();
+ }
+ }
+
+
+ /** Grab the entire pipeline.
+ *@param transformationConnectionNames - the names of the transformation connections, in order
+ *@param outputConnectionName - the name of the output connection
+ *@param transformationDescriptionStrings - the array of description strings for transformations
+ *@param outputDescriptionString - the output description string
+ *@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
+ */
+ protected PipelineObject pipelineGrab(String[] transformationConnectionNames, String outputConnectionName,
+ String[] transformationDescriptionStrings, String outputDescriptionString)
+ throws ManifoldCFException
+ {
+ // Pick up all needed transformation connectors
+ ITransformationConnection[] transformationConnections = new ITransformationConnection[transformationConnectionNames.length];
+
+ for (int i = 0; i < transformationConnections.length; i++)
+ {
+ transformationConnections[i] = transformationConnectionManager.load(transformationConnectionNames[i]);
+ }
+
+ ITransformationConnector[] transformationConnectors = transformationConnectorPool.grabMultiple(transformationConnectionNames,transformationConnections);
+ for (ITransformationConnector c : transformationConnectors)
+ {
+ if (c == null)
+ {
+ transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+ return null;
+ }
+ }
+
+ // Last, pick up output connector. If it fails we have to release the transformation connectors.
+ try
+ {
+ IOutputConnection connection = connectionManager.load(outputConnectionName);
+ IOutputConnector connector = outputConnectorPool.grab(connection);
+ if (connector == null)
+ {
+ transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+ return null;
+ }
+ return new PipelineObject(transformationConnections,transformationConnectors,connection,connector,
+ transformationDescriptionStrings,outputDescriptionString);
+ }
+ catch (Throwable e)
+ {
+ transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+ if (e instanceof ManifoldCFException)
+ throw (ManifoldCFException)e;
+ else if (e instanceof RuntimeException)
+ throw (RuntimeException)e;
+ else if (e instanceof Error)
+ throw (Error)e;
+ else
+ throw new RuntimeException("Unexpected exception type: "+e.getClass().getName()+": "+e.getMessage(),e);
}
}
@@ -472,10 +523,6 @@ public class IncrementalIngester extends
long recordTime, IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
- activities = new OutputActivitiesWrapper(activities,outputConnectionName);
-
- IOutputConnection connection = connectionManager.load(outputConnectionName);
-
String docKey = makeKey(identifierClass,identifierHash);
if (Logging.ingest.isDebugEnabled())
@@ -483,7 +530,14 @@ public class IncrementalIngester extends
Logging.ingest.debug("Recording document '"+docKey+"' for output connection '"+outputConnectionName+"'");
}
- performIngestion(connection,docKey,documentVersion,null,null,null,null,recordTime,null,activities);
+ performIngestion(new String[0],new String[0],
+ outputConnectionName,null,
+ docKey,documentVersion,null,
+ null,
+ null,
+ recordTime,
+ null,
+ activities);
}
/** Ingest a document.
@@ -604,26 +658,26 @@ public class IncrementalIngester extends
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
- activities = new OutputActivitiesWrapper(activities,outputConnectionName);
-
- // MHL
- IOutputConnection connection = connectionManager.load(outputConnectionName);
-
String docKey = makeKey(identifierClass,identifierHash);
if (Logging.ingest.isDebugEnabled())
{
Logging.ingest.debug("Ingesting document '"+docKey+"' into output connection '"+outputConnectionName+"'");
}
-
- return performIngestion(connection,docKey,documentVersion,outputVersion,parameterVersion,authorityName,
- data,ingestTime,documentURI,activities);
+ return performIngestion(transformationConnectionNames,transformationVersions,
+ outputConnectionName,outputVersion,
+ docKey,documentVersion,parameterVersion,
+ authorityName,
+ data,
+ ingestTime,documentURI,
+ activities);
}
-
/** Do the actual ingestion, or just record it if there's nothing to ingest. */
- protected boolean performIngestion(IOutputConnection connection,
- String docKey, String documentVersion, String outputVersion, String parameterVersion,
+ protected boolean performIngestion(
+ String[] transformationConnectionNames, String[] transformationVersions,
+ String outputConnectionName, String outputVersion,
+ String docKey, String documentVersion, String parameterVersion,
String authorityNameString,
RepositoryDocument data,
long ingestTime, String documentURI,
@@ -651,7 +705,7 @@ public class IncrementalIngester extends
ArrayList list = new ArrayList();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(docKeyField,docKey),
- new UnitaryClause(outputConnNameField,connection.getName())});
+ new UnitaryClause(outputConnNameField,outputConnectionName)});
IResultSet set = performQuery("SELECT "+docURIField+","+uriHashField+","+lastOutputVersionField+" FROM "+getTableName()+
" WHERE "+query,list,null,null);
@@ -697,9 +751,9 @@ public class IncrementalIngester extends
String[] lockArray = new String[uriCount];
uriCount = 0;
if (documentURI != null)
- lockArray[uriCount++] = connection.getName()+":"+documentURI;
+ lockArray[uriCount++] = outputConnectionName+":"+documentURI;
if (oldURI != null && (documentURI == null || !documentURI.equals(oldURI)))
- lockArray[uriCount++] = connection.getName()+":"+oldURI;
+ lockArray[uriCount++] = outputConnectionName+":"+oldURI;
lockManager.enterCriticalSections(null,null,lockArray);
try
@@ -713,10 +767,11 @@ public class IncrementalIngester extends
list.clear();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(uriHashField,"=",oldURIHash),
- new UnitaryClause(outputConnNameField,"=",connection.getName())});
+ new UnitaryClause(outputConnNameField,"=",outputConnectionName)});
list.add(docKey);
performDelete("WHERE "+query+" AND "+docKeyField+"!=?",list,null);
- removeDocument(connection,oldURI,oldOutputVersion,activities);
+ IOutputConnection connection = connectionManager.load(outputConnectionName);
+ removeDocument(connection,oldURI,oldOutputVersion,new OutputRemoveActivitiesWrapper(activities,outputConnectionName));
}
if (documentURI != null)
@@ -725,7 +780,7 @@ public class IncrementalIngester extends
list.clear();
String query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(uriHashField,"=",documentURIHash),
- new UnitaryClause(outputConnNameField,"=",connection.getName())});
+ new UnitaryClause(outputConnNameField,"=",outputConnectionName)});
list.add(docKey);
performDelete("WHERE "+query+" AND "+ docKeyField+"!=?",list,null);
}
@@ -750,15 +805,21 @@ public class IncrementalIngester extends
// This is a marker that says "something is there"; it has an empty version, which indicates
// that we don't know anything about it. That means it will be reingested when the
// next version comes along, and will be deleted if called for also.
- noteDocumentIngest(connection.getName(),docKey,null,null,null,null,ingestTime,documentURI,documentURIHash);
- int result = addOrReplaceDocument(connection,documentURI,outputVersion,data,authorityNameString,activities);
- noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
+ // MHL -- needs to record info about transformations!!
+ noteDocumentIngest(outputConnectionName,docKey,null,null,null,null,ingestTime,documentURI,documentURIHash);
+ int result = addOrReplaceDocument(transformationConnectionNames,transformationVersions,
+ outputConnectionName,outputVersion,
+ documentURI,data,authorityNameString,
+ activities);
+ // MHL -- needs to record info about transformations!!
+ noteDocumentIngest(outputConnectionName,docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,documentURI,documentURIHash);
return result == IOutputConnector.DOCUMENTSTATUS_ACCEPTED;
}
// If we get here, it means we are noting that the document was examined, but that no change was required. This is signaled
// to noteDocumentIngest by having the null documentURI.
- noteDocumentIngest(connection.getName(),docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,null,null);
+ // MHL -- needs to record info about transformations!!
+ noteDocumentIngest(outputConnectionName,docKey,documentVersion,outputVersion,parameterVersion,authorityNameString,ingestTime,null,null);
return true;
}
finally
@@ -1875,24 +1936,29 @@ public class IncrementalIngester extends
/** Add or replace document, using the specified output connection, via the standard pool.
*/
- protected int addOrReplaceDocument(IOutputConnection connection, String documentURI, String outputDescription,
- RepositoryDocument document, String authorityNameString,
- IOutputAddActivity activities)
+ protected int addOrReplaceDocument(
+ String[] transformationConnectionNames, String[] transformationDescriptionStrings,
+ String outputConnectionName, String outputDescriptionString,
+ String documentURI, RepositoryDocument document, String authorityNameString,
+ IOutputAddActivity finalActivities)
throws ManifoldCFException, ServiceInterruption
{
// Set indexing date
document.setIndexingDate(new Date());
- IOutputConnector connector = outputConnectorPool.grab(connection);
- if (connector == null)
- // The connector is not installed; treat this as a service interruption.
- throw new ServiceInterruption("Output connector not installed",0L);
+
+ // Set up a pipeline
+ PipelineObject pipeline = pipelineGrab(transformationConnectionNames,outputConnectionName,
+ transformationDescriptionStrings,outputDescriptionString);
+ if (pipeline == null)
+ // A connector is not installed; treat this as a service interruption.
+ throw new ServiceInterruption("Pipeline connector not installed",0L);
try
{
- return connector.addOrReplaceDocument(documentURI,outputDescription,document,authorityNameString,activities);
+ return pipeline.addOrReplaceDocument(documentURI,document,authorityNameString,finalActivities);
}
finally
{
- outputConnectorPool.release(connection,connector);
+ pipeline.release();
}
}
@@ -1983,6 +2049,45 @@ public class IncrementalIngester extends
}
+ /** Wrapper class for add activity. This handles conversion of transformation connector activity logging to
+ * qualified activity names */
+ protected static class TransformationRecordingActivity implements IOutputHistoryActivity
+ {
+ protected final IOutputHistoryActivity activityProvider;
+ protected final String transformationConnectionName;
+
+ public TransformationRecordingActivity(IOutputHistoryActivity activityProvider, String transformationConnectionName)
+ {
+ this.activityProvider = activityProvider;
+ this.transformationConnectionName = transformationConnectionName;
+ }
+
+ /** Record time-stamped information about the activity of the output connector.
+ *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every
+ * activity has an associated time; the startTime field records when the activity began. A null value
+ * indicates that the start time and the finishing time are the same.
+ *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is
+ * used to categorize what kind of activity is being recorded. For example, a web connector might record a
+ * "fetch document" activity. Cannot be null.
+ *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable.
+ *@param entityURI is a (possibly long) string which identifies the object involved in the history record.
+ * The interpretation of this field will differ from connector to connector. May be null.
+ *@param resultCode contains a terse description of the result of the activity. The description is limited in
+ * size to 255 characters, and can be interpreted only in the context of the current connector. May be null.
+ *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
+ * described in the resultCode field. This field is not meant to be queried on. May be null.
+ */
+ @Override
+ public void recordActivity(Long startTime, String activityType, Long dataSize,
+ String entityURI, String resultCode, String resultDescription)
+ throws ManifoldCFException
+ {
+ activityProvider.recordActivity(startTime,ManifoldCF.qualifyTransformationActivityName(activityType,transformationConnectionName),
+ dataSize,entityURI,resultCode,resultDescription);
+ }
+
+ }
+
protected static class OutputRemoveActivitiesWrapper extends OutputRecordingActivity implements IOutputRemoveActivity
{
protected final IOutputRemoveActivity activities;
@@ -1995,11 +2100,11 @@ public class IncrementalIngester extends
}
- protected static class OutputActivitiesWrapper extends OutputRecordingActivity implements IOutputActivity
+ protected static class OutputAddActivitiesWrapper extends OutputRecordingActivity implements IOutputAddActivity
{
- protected final IOutputActivity activities;
+ protected final IOutputAddActivity activities;
- public OutputActivitiesWrapper(IOutputActivity activities, String outputConnectionName)
+ public OutputAddActivitiesWrapper(IOutputAddActivity activities, String outputConnectionName)
{
super(activities,outputConnectionName);
this.activities = activities;
@@ -2019,13 +2124,15 @@ public class IncrementalIngester extends
}
/** Send a document via the pipeline to the next output connection.
+ *@param documentURI is the document's URI.
*@param document is the document data to be processed (handed to the output data store).
+ *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens.
*@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
*/
- public int sendDocument(RepositoryDocument document)
+ public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
throws ManifoldCFException, ServiceInterruption
{
- return activities.sendDocument(document);
+ return activities.sendDocument(documentURI,document,authorityNameString);
}
/** Detect if a mime type is acceptable downstream or not. This method is used to determine whether it makes sense to fetch a document
@@ -2079,4 +2186,370 @@ public class IncrementalIngester extends
}
+ protected static class OutputActivitiesWrapper extends OutputAddActivitiesWrapper implements IOutputActivity
+ {
+ protected final IOutputActivity activities;
+
+ public OutputActivitiesWrapper(IOutputActivity activities, String outputConnectionName)
+ {
+ super(activities,outputConnectionName);
+ this.activities = activities;
+ }
+ }
+
+ protected class PipelineObject
+ {
+ public final IOutputConnection outputConnection;
+ public final IOutputConnector outputConnector;
+ public final ITransformationConnection[] transformationConnections;
+ public final ITransformationConnector[] transformationConnectors;
+ public final String outputDescription;
+ public final String[] transformationDescriptions;
+
+ public PipelineObject(
+ ITransformationConnection[] transformationConnections, ITransformationConnector[] transformationConnectors,
+ IOutputConnection outputConnection, IOutputConnector outputConnector,
+ String[] transformationDescriptions, String outputDescription)
+ {
+ this.transformationConnections = transformationConnections;
+ this.transformationConnectors = transformationConnectors;
+ this.outputConnection = outputConnection;
+ this.outputConnector = outputConnector;
+ this.outputDescription = outputDescription;
+ this.transformationDescriptions = transformationDescriptions;
+ }
+
+ public boolean checkMimeTypeIndexable(String mimeType, IOutputCheckActivity finalActivity)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
+ return entryPoint.getPipelineConnector().checkMimeTypeIndexable(entryPoint.getPipelineDescriptionString(),mimeType,entryPoint.getPipelineCheckActivity());
+ }
+
+ public boolean checkDocumentIndexable(File localFile, IOutputCheckActivity finalActivity)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
+ return entryPoint.getPipelineConnector().checkDocumentIndexable(entryPoint.getPipelineDescriptionString(),localFile,entryPoint.getPipelineCheckActivity());
+ }
+
+ public boolean checkLengthIndexable(long length, IOutputCheckActivity finalActivity)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
+ return entryPoint.getPipelineConnector().checkLengthIndexable(entryPoint.getPipelineDescriptionString(),length,entryPoint.getPipelineCheckActivity());
+ }
+
+ public boolean checkURLIndexable(String uri, IOutputCheckActivity finalActivity)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ PipelineCheckEntryPoint entryPoint = buildCheckPipeline(finalActivity);
+ return entryPoint.getPipelineConnector().checkURLIndexable(entryPoint.getPipelineDescriptionString(),uri,entryPoint.getPipelineCheckActivity());
+ }
+
+ public int addOrReplaceDocument(String documentURI, RepositoryDocument document, String authorityNameString, IOutputAddActivity finalActivity)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ PipelineAddEntryPoint entryPoint = buildAddPipeline(finalActivity);
+ return entryPoint.getPipelineConnector().addOrReplaceDocument(documentURI,entryPoint.getPipelineDescriptionString(),
+ document,authorityNameString,entryPoint.getPipelineAddActivity());
+ }
+
+ public void release()
+ throws ManifoldCFException
+ {
+ outputConnectorPool.release(outputConnection,outputConnector);
+ transformationConnectorPool.releaseMultiple(transformationConnections,transformationConnectors);
+ }
+
+ protected PipelineCheckEntryPoint buildCheckPipeline(IOutputCheckActivity finalActivity)
+ {
+ // Build output stage first
+ PipelineCheckEntryPoint currentStage = new PipelineCheckEntryPoint(outputConnector,outputDescription,finalActivity);
+ // Go through transformations backwards
+ int i = transformationConnectors.length;
+ while (i > 0)
+ {
+ i--;
+ currentStage = new PipelineCheckEntryPoint(transformationConnectors[i],transformationDescriptions[i],
+ new PipelineCheckActivity(currentStage.getPipelineConnector(),currentStage.getPipelineDescriptionString(),currentStage.getPipelineCheckActivity()));
+ }
+ return currentStage;
+ }
+
+ protected PipelineAddEntryPoint buildAddPipeline(IOutputAddActivity finalActivity)
+ {
+ // Build output stage first
+ PipelineAddEntryPoint currentStage = new PipelineAddEntryPoint(outputConnector,outputDescription,
+ new OutputAddActivitiesWrapper(finalActivity,outputConnection.getName()));
+ // Go through transformations backwards
+ int i = transformationConnectors.length;
+ while (i > 0)
+ {
+ i--;
+ currentStage = new PipelineAddEntryPoint(transformationConnectors[i],transformationDescriptions[i],
+ new PipelineAddActivity(currentStage.getPipelineConnector(),currentStage.getPipelineDescriptionString(),currentStage.getPipelineAddActivity(),
+ new TransformationRecordingActivity(finalActivity,transformationConnections[i].getName()),finalActivity));
+ }
+ return currentStage;
+ }
+
+ }
+
+ /** This class describes the entry stage of a check pipeline.
+ */
+ public static class PipelineCheckEntryPoint
+ {
+ protected final IPipelineConnector pipelineConnector;
+ protected final String pipelineDescriptionString;
+ protected final IOutputCheckActivity checkActivity;
+
+ public PipelineCheckEntryPoint(IPipelineConnector pipelineConnector,
+ String pipelineDescriptionString,
+ IOutputCheckActivity checkActivity)
+ {
+ this.pipelineConnector = pipelineConnector;
+ this.pipelineDescriptionString = pipelineDescriptionString;
+ this.checkActivity = checkActivity;
+ }
+
+ public IPipelineConnector getPipelineConnector()
+ {
+ return pipelineConnector;
+ }
+
+ public String getPipelineDescriptionString()
+ {
+ return pipelineDescriptionString;
+ }
+
+ public IOutputCheckActivity getPipelineCheckActivity()
+ {
+ return checkActivity;
+ }
+ }
+
+ /** This class is used to join together pipeline stages for check operations */
+ public static class PipelineCheckActivity implements IOutputCheckActivity
+ {
+ protected final IPipelineConnector pipelineConnector;
+ protected final String pipelineDescriptionString;
+ protected final IOutputCheckActivity checkActivity;
+
+ public PipelineCheckActivity(IPipelineConnector pipelineConnector, String pipelineDescriptionString, IOutputCheckActivity checkActivity)
+ {
+ this.pipelineConnector = pipelineConnector;
+ this.pipelineDescriptionString = pipelineDescriptionString;
+ this.checkActivity = checkActivity;
+ }
+
+ /** Detect if a mime type is acceptable downstream or not. This method is used to determine whether it makes sense to fetch a document
+ * in the first place.
+ *@param mimeType is the mime type of the document.
+ *@return true if the mime type can be accepted by the downstream connection.
+ */
+ @Override
+ public boolean checkMimeTypeIndexable(String mimeType)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return pipelineConnector.checkMimeTypeIndexable(pipelineDescriptionString,mimeType,checkActivity);
+ }
+
+ /** Pre-determine whether a document (passed here as a File object) is acceptable downstream. This method is
+ * used to determine whether a document needs to be actually transferred. This hook is provided mainly to support
+ * search engines that only handle a small set of accepted file types.
+ *@param localFile is the local file to check.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkDocumentIndexable(File localFile)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return pipelineConnector.checkDocumentIndexable(pipelineDescriptionString,localFile,checkActivity);
+ }
+
+ /** Pre-determine whether a document's length is acceptable downstream. This method is used
+ * to determine whether to fetch a document in the first place.
+ *@param length is the length of the document.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkLengthIndexable(long length)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return pipelineConnector.checkLengthIndexable(pipelineDescriptionString,length,checkActivity);
+ }
+
+ /** Pre-determine whether a document's URL is acceptable downstream. This method is used
+ * to help filter out documents that cannot be indexed in advance.
+ *@param url is the URL of the document.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkURLIndexable(String url)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return pipelineConnector.checkURLIndexable(pipelineDescriptionString,url,checkActivity);
+ }
+
+ }
+
+ /** This class describes the entry stage of an add pipeline.
+ */
+ public static class PipelineAddEntryPoint
+ {
+ protected final IPipelineConnector pipelineConnector;
+ protected final String pipelineDescriptionString;
+ protected final IOutputAddActivity addActivity;
+
+ public PipelineAddEntryPoint(IPipelineConnector pipelineConnector,
+ String pipelineDescriptionString,
+ IOutputAddActivity addActivity)
+ {
+ this.pipelineConnector = pipelineConnector;
+ this.pipelineDescriptionString = pipelineDescriptionString;
+ this.addActivity = addActivity;
+ }
+
+ public IPipelineConnector getPipelineConnector()
+ {
+ return pipelineConnector;
+ }
+
+ public String getPipelineDescriptionString()
+ {
+ return pipelineDescriptionString;
+ }
+
+ public IOutputAddActivity getPipelineAddActivity()
+ {
+ return addActivity;
+ }
+ }
+
+ /** This class is used to join together pipeline stages for add operations */
+ public static class PipelineAddActivity implements IOutputAddActivity
+ {
+ protected final IPipelineConnector pipelineConnector;
+ protected final String pipelineDescriptionString;
+ protected final IOutputAddActivity addActivity;
+ protected final IOutputHistoryActivity finalHistoryActivity;
+ protected final IOutputQualifyActivity finalQualifyActivity;
+
+ public PipelineAddActivity(IPipelineConnector pipelineConnector,
+ String pipelineDescriptionString,
+ IOutputAddActivity addActivity,
+ IOutputHistoryActivity finalHistoryActivity,
+ IOutputQualifyActivity finalQualifyActivity)
+ {
+ this.pipelineConnector = pipelineConnector;
+ this.pipelineDescriptionString = pipelineDescriptionString;
+ this.addActivity = addActivity;
+ this.finalHistoryActivity = finalHistoryActivity;
+ this.finalQualifyActivity = finalQualifyActivity;
+ }
+
+ /** Detect if a mime type is acceptable downstream or not. This method is used to determine whether it makes sense to fetch a document
+ * in the first place.
+ *@param mimeType is the mime type of the document.
+ *@return true if the mime type can be accepted by the downstream connection.
+ */
+ @Override
+ public boolean checkMimeTypeIndexable(String mimeType)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return pipelineConnector.checkMimeTypeIndexable(pipelineDescriptionString,mimeType,addActivity);
+ }
+
+ /** Pre-determine whether a document (passed here as a File object) is acceptable downstream. This method is
+ * used to determine whether a document needs to be actually transferred. This hook is provided mainly to support
+ * search engines that only handle a small set of accepted file types.
+ *@param localFile is the local file to check.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkDocumentIndexable(File localFile)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return pipelineConnector.checkDocumentIndexable(pipelineDescriptionString,localFile,addActivity);
+ }
+
+ /** Pre-determine whether a document's length is acceptable downstream. This method is used
+ * to determine whether to fetch a document in the first place.
+ *@param length is the length of the document.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkLengthIndexable(long length)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return pipelineConnector.checkLengthIndexable(pipelineDescriptionString,length,addActivity);
+ }
+
+ /** Pre-determine whether a document's URL is acceptable downstream. This method is used
+ * to help filter out documents that cannot be indexed in advance.
+ *@param url is the URL of the document.
+ *@return true if the file is acceptable by the downstream connection.
+ */
+ @Override
+ public boolean checkURLIndexable(String url)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ return pipelineConnector.checkURLIndexable(pipelineDescriptionString,url,addActivity);
+ }
+
+ /** Send a document via the pipeline to the next output connection.
+ *@param documentURI is the document's URI.
+ *@param document is the document data to be processed (handed to the output data store).
+ *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens.
+ *@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
+ */
+ public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ // This goes to the next pipeline stage.
+ return pipelineConnector.addOrReplaceDocument(documentURI,pipelineDescriptionString,
+ document,authorityNameString,addActivity);
+ }
+
+ /** Qualify an access token appropriately, to match access tokens as returned by mod_aa. This method
+ * includes the authority name with the access token, if any, so that each authority may establish its own token space.
+ *@param authorityNameString is the name of the authority to use to qualify the access token.
+ *@param accessToken is the raw, repository access token.
+ *@return the properly qualified access token.
+ */
+ @Override
+ public String qualifyAccessToken(String authorityNameString, String accessToken)
+ throws ManifoldCFException
+ {
+ // This functionality does not need to be staged; we just want to vector through to the final stage directly.
+ return finalQualifyActivity.qualifyAccessToken(authorityNameString,accessToken);
+ }
+
+ /** Record time-stamped information about the activity of the output connector.
+ *@param startTime is either null or the time since the start of epoch in milliseconds (Jan 1, 1970). Every
+ * activity has an associated time; the startTime field records when the activity began. A null value
+ * indicates that the start time and the finishing time are the same.
+ *@param activityType is a string which is fully interpretable only in the context of the connector involved, which is
+ * used to categorize what kind of activity is being recorded. For example, a web connector might record a
+ * "fetch document" activity. Cannot be null.
+ *@param dataSize is the number of bytes of data involved in the activity, or null if not applicable.
+ *@param entityURI is a (possibly long) string which identifies the object involved in the history record.
+ * The interpretation of this field will differ from connector to connector. May be null.
+ *@param resultCode contains a terse description of the result of the activity. The description is limited in
+ * size to 255 characters, and can be interpreted only in the context of the current connector. May be null.
+ *@param resultDescription is a (possibly long) human-readable string which adds detail, if required, to the result
+ * described in the resultCode field. This field is not meant to be queried on. May be null.
+ */
+ public void recordActivity(Long startTime, String activityType, Long dataSize,
+ String entityURI, String resultCode, String resultDescription)
+ throws ManifoldCFException
+ {
+ // Each stage of the pipeline uses a specific activity for recording history, but it's not fundamentally
+ // pipelined
+ finalHistoryActivity.recordActivity(startTime,activityType,dataSize,entityURI,resultCode,resultDescription);
+ }
+
+ }
+
}
Modified: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java?rev=1601092&r1=1601091&r2=1601092&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputAddActivity.java Sat Jun 7 11:56:06 2014
@@ -24,24 +24,17 @@ import org.apache.manifoldcf.agents.inte
/** This interface abstracts from the activities that an output connector can do
when adding or replacing documents.
*/
-public interface IOutputAddActivity extends IOutputHistoryActivity,IOutputCheckActivity
+public interface IOutputAddActivity extends IOutputQualifyActivity,IOutputHistoryActivity,IOutputCheckActivity
{
public static final String _rcsid = "@(#)$Id: IOutputAddActivity.java 988245 2010-08-23 18:39:35Z kwright $";
- /** Qualify an access token appropriately, to match access tokens as returned by mod_aa. This method
- * includes the authority name with the access token, if any, so that each authority may establish its own token space.
- *@param authorityNameString is the name of the authority to use to qualify the access token.
- *@param accessToken is the raw, repository access token.
- *@return the properly qualified access token.
- */
- public String qualifyAccessToken(String authorityNameString, String accessToken)
- throws ManifoldCFException;
-
/** Send a document via the pipeline to the next output connection.
+ *@param documentURI is the document's URI.
*@param document is the document data to be processed (handed to the output data store).
+ *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens.
*@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
*/
- public int sendDocument(RepositoryDocument document)
+ public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
throws ManifoldCFException, ServiceInterruption;
}
Added: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java?rev=1601092&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java (added)
+++ manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java Sat Jun 7 11:56:06 2014
@@ -0,0 +1,39 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.agents.interfaces;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+
+/** This interface abstracts from the activities that an output connector can do
+when qualifying authority names.
+*/
+public interface IOutputQualifyActivity
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ /** Qualify an access token appropriately, to match access tokens as returned by mod_aa. This method
+ * includes the authority name with the access token, if any, so that each authority may establish its own token space.
+ *@param authorityNameString is the name of the authority to use to qualify the access token.
+ *@param accessToken is the raw, repository access token.
+ *@return the properly qualified access token.
+ */
+ public String qualifyAccessToken(String authorityNameString, String accessToken)
+ throws ManifoldCFException;
+}
Propchange: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/CONNECTORS-946/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IOutputQualifyActivity.java
------------------------------------------------------------------------------
svn:keywords = Id
Modified: manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1601092&r1=1601091&r2=1601092&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/CONNECTORS-946/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Sat Jun 7 11:56:06 2014
@@ -2643,12 +2643,13 @@ public class WorkerThread extends Thread
return URLEncoder.encode(authorityNameString) + ":" + URLEncoder.encode(accessToken);
}
- /** Send adocument via the pipeline to the next connection.
+ /** Send a document via the pipeline to the next output connection.
+ *@param documentURI is the document's URI.
*@param document is the document data to be processed (handed to the output data store).
+ *@param authorityNameString is the authority name string that should be used to qualify the document's access tokens.
*@return the document status (accepted or permanently rejected); return codes are listed in IPipelineConnector.
*/
- @Override
- public int sendDocument(RepositoryDocument document)
+ public int sendDocument(String documentURI, RepositoryDocument document, String authorityNameString)
throws ManifoldCFException, ServiceInterruption
{
// No downstream connection at output connection level.