You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/12/09 22:51:46 UTC
svn commit: r1644200 - in /manifoldcf/branches/dev_1x: ./ framework/
framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/
framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/
framework/pull-agent/src/main/ja...
Author: kwright
Date: Tue Dec 9 21:51:46 2014
New Revision: 1644200
URL: http://svn.apache.org/r1644200
Log:
Fix for CONNECTORS-1118. Warning: change to the IIncrementalIngester API.
Added:
manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnections.java
- copied unchanged from r1644197, manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnections.java
manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnectionsWithVersions.java
- copied unchanged from r1644197, manifoldcf/trunk/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineConnectionsWithVersions.java
Modified:
manifoldcf/branches/dev_1x/ (props changed)
manifoldcf/branches/dev_1x/CHANGES.txt
manifoldcf/branches/dev_1x/framework/ (props changed)
manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Propchange: manifoldcf/branches/dev_1x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 9 21:51:46 2014
@@ -42,6 +42,7 @@
/manifoldcf/branches/CONNECTORS-1089:1635610-1635937
/manifoldcf/branches/CONNECTORS-1100:1637693-1640317
/manifoldcf/branches/CONNECTORS-1104:1640149-1640198
+/manifoldcf/branches/CONNECTORS-1118:1644108-1644196
/manifoldcf/branches/CONNECTORS-120:1406712-1407974,1407982-1411043,1411049-1416451
/manifoldcf/branches/CONNECTORS-120-1:1416450-1417056
/manifoldcf/branches/CONNECTORS-13:1525862-1527182,1539324-1541634
@@ -116,4 +117,4 @@
/manifoldcf/branches/CONNECTORS-981:1605049-1605773
/manifoldcf/branches/CONNECTORS-989:1611600-1612101
/manifoldcf/branches/CONNECTORS-990:1610284-1610707
-/manifoldcf/trunk:1620703,1620748,1620812,1620862,1621449,1621613,1621855,1622213,1622740,1622850,1622853-1622854,1623249,1623251,1623314,1623599,1623951,1623953-1623954,1623956,1623972,1624058,1624085,1624174,1624236,1624377,1624384,1624399,1624449,1624464,1624504,1624729-1624731,1624906,1624909-1624910,1625023,1625095,1625103,1625108,1625264,1625270,1625394,1625400,1625910,1626090,1626097,1626102,1626638-1626639,1626973,1627687,1627690,1627959,1628066,1628106,1628168,1628188,1628699,1628798,1628808,1628845,1628905,1629122,1629374-1629375,1629379,1629541,1629994,1630188,1630535,1630623,1630671,1630812,1630885,1631039,1631162,1631164,1631252,1631750,1631953,1632013,1632225,1632289,1632562,1632844,1632847,1632854,1633062-1633063,1633108,1633193,1633202,1633282,1633284,1633295,1633336,1633339,1633345,1633348,1633364,1633378,1633383,1633432,1633546,1633590,1633634,1633668,1633727,1633760,1633764,1633786,1633910,1633923,1634021,1634028,1634067,1634132,1634145,1634148,1634155,1634188,163
4202,1634264,1634373,1634530,1634688,1634850,1634857,1635103,1635116,1635421,1635438,1635478,1635481,1635484,1635490,1635809,1635939,1636146,1636167,1636180,1636207,1636215,1636232,1636334,1636519,1636570,1636684,1636940,1637011,1637310,1637350,1637364,1637373,1637378,1639259,1639593,1639600,1640018,1640101,1640199,1640204,1640208,1640314,1640319,1640749,1640772,1640805,1640888,1640925,1640941-1640942,1641222,1641328,1641557,1641559,1641629,1641633,1641724,1641754,1641911,1642163,1642255,1642318,1642531,1642650,1642658,1642673,1642716
+/manifoldcf/trunk:1620703,1620748,1620812,1620862,1621449,1621613,1621855,1622213,1622740,1622850,1622853-1622854,1623249,1623251,1623314,1623599,1623951,1623953-1623954,1623956,1623972,1624058,1624085,1624174,1624236,1624377,1624384,1624399,1624449,1624464,1624504,1624729-1624731,1624906,1624909-1624910,1625023,1625095,1625103,1625108,1625264,1625270,1625394,1625400,1625910,1626090,1626097,1626102,1626638-1626639,1626973,1627687,1627690,1627959,1628066,1628106,1628168,1628188,1628699,1628798,1628808,1628845,1628905,1629122,1629374-1629375,1629379,1629541,1629994,1630188,1630535,1630623,1630671,1630812,1630885,1631039,1631162,1631164,1631252,1631750,1631953,1632013,1632225,1632289,1632562,1632844,1632847,1632854,1633062-1633063,1633108,1633193,1633202,1633282,1633284,1633295,1633336,1633339,1633345,1633348,1633364,1633378,1633383,1633432,1633546,1633590,1633634,1633668,1633727,1633760,1633764,1633786,1633910,1633923,1634021,1634028,1634067,1634132,1634145,1634148,1634155,1634188,163
4202,1634264,1634373,1634530,1634688,1634850,1634857,1635103,1635116,1635421,1635438,1635478,1635481,1635484,1635490,1635809,1635939,1636146,1636167,1636180,1636207,1636215,1636232,1636334,1636519,1636570,1636684,1636940,1637011,1637310,1637350,1637364,1637373,1637378,1639259,1639593,1639600,1640018,1640101,1640199,1640204,1640208,1640314,1640319,1640749,1640772,1640805,1640888,1640925,1640941-1640942,1641222,1641328,1641557,1641559,1641629,1641633,1641724,1641754,1641911,1642163,1642255,1642318,1642531,1642650,1642658,1642673,1642716,1644197
Modified: manifoldcf/branches/dev_1x/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/CHANGES.txt?rev=1644200&r1=1644199&r2=1644200&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/CHANGES.txt (original)
+++ manifoldcf/branches/dev_1x/CHANGES.txt Tue Dec 9 21:51:46 2014
@@ -3,6 +3,10 @@ $Id$
======================= 1.8-dev =====================
+CONNECTORS-1118: Change IIncrementalIngester interface to allow
+for cached connection instances.
+(Aeham Abushwashi, Karl Wright)
+
CONNECTORS-974: Make SharePoint 2010 be the default selection.
(Karl Wright)
Propchange: manifoldcf/branches/dev_1x/framework/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 9 21:51:46 2014
@@ -112,4 +112,4 @@
/manifoldcf/branches/CONNECTORS-989/framework:1611600-1612101
/manifoldcf/branches/CONNECTORS-990/framework:1610284-1610707
/manifoldcf/trunk:1629122
-/manifoldcf/trunk/framework:1620703,1620748,1620812,1620862,1621449,1621613,1621855,1622213,1622740,1622850,1622853-1622854,1623249,1623251,1623314,1623599,1623951,1623953-1623954,1623956,1623972,1624058,1624085,1624174,1624236,1624377,1624384,1624399,1624449,1624464,1624504,1624729-1624731,1624906,1624909-1624910,1625023,1625095,1625103,1625108,1625264,1625270,1625394,1625400,1625910,1626090,1626097,1626102,1626638-1626639,1626973,1627687,1627690,1627959,1628066,1628106,1628168,1628188,1628699,1628798,1628808,1628845,1628905,1629122,1629374-1629375,1629379,1629541,1629994,1630188,1630535,1630623,1630671,1630812,1630885,1631039,1631162,1631164,1631252,1632013,1632289,1632844,1633108,1633193,1633202,1633348,1633364,1634145,1634148,1634155,1634264,1634373,1634530,1635438,1635809,1636146,1636180,1636207,1636232,1636334,1636519,1636570,1636684,1636940,1637011,1639593,1639600,1640018,1640101,1640199,1640314,1640319,1640749,1640772,1640925,1640941,1641222,1641557,1641559,1641724,1641911,1
642163,1642255,1642318
+/manifoldcf/trunk/framework:1620703,1620748,1620812,1620862,1621449,1621613,1621855,1622213,1622740,1622850,1622853-1622854,1623249,1623251,1623314,1623599,1623951,1623953-1623954,1623956,1623972,1624058,1624085,1624174,1624236,1624377,1624384,1624399,1624449,1624464,1624504,1624729-1624731,1624906,1624909-1624910,1625023,1625095,1625103,1625108,1625264,1625270,1625394,1625400,1625910,1626090,1626097,1626102,1626638-1626639,1626973,1627687,1627690,1627959,1628066,1628106,1628168,1628188,1628699,1628798,1628808,1628845,1628905,1629122,1629374-1629375,1629379,1629541,1629994,1630188,1630535,1630623,1630671,1630812,1630885,1631039,1631162,1631164,1631252,1632013,1632289,1632844,1633108,1633193,1633202,1633348,1633364,1634145,1634148,1634155,1634264,1634373,1634530,1635438,1635809,1636146,1636180,1636207,1636232,1636334,1636519,1636570,1636684,1636940,1637011,1639593,1639600,1640018,1640101,1640199,1640314,1640319,1640749,1640772,1640925,1640941,1641222,1641557,1641559,1641724,1641911,1
642163,1642255,1642318,1644197
Modified: manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1644200&r1=1644199&r2=1644200&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Tue Dec 9 21:51:46 2014
@@ -256,20 +256,19 @@ public class IncrementalIngester extends
}
/** Check if a date is indexable.
- *@param pipelineSpecification is the pipeline specification.
+ *@param pipelineConnections is the IPipelineConnections object for this pipeline.
*@param date is the date to check.
*@param activity are the activities available to this method.
*@return true if the mimeType is indexable.
*/
@Override
public boolean checkDateIndexable(
- IPipelineSpecification pipelineSpecification,
+ IPipelineConnections pipelineConnections,
Date date,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(
- new PipelineConnections(pipelineSpecification));
+ PipelineObject pipeline = pipelineGrab(pipelineConnections);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -284,20 +283,19 @@ public class IncrementalIngester extends
}
/** Check if a mime type is indexable.
- *@param pipelineSpecification is the pipeline specification.
+ *@param pipelineConnections is the pipeline connections object for this pipeline.
*@param mimeType is the mime type to check.
*@param activity are the activities available to this method.
*@return true if the mimeType is indexable.
*/
@Override
public boolean checkMimeTypeIndexable(
- IPipelineSpecification pipelineSpecification,
+ IPipelineConnections pipelineConnections,
String mimeType,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(
- new PipelineConnections(pipelineSpecification));
+ PipelineObject pipeline = pipelineGrab(pipelineConnections);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -312,20 +310,19 @@ public class IncrementalIngester extends
}
/** Check if a file is indexable.
- *@param pipelineSpecification is the pipeline specification.
+ *@param pipelineConnections is the pipeline connections object for this pipeline.
*@param localFile is the local file to check.
*@param activity are the activities available to this method.
*@return true if the local file is indexable.
*/
@Override
public boolean checkDocumentIndexable(
- IPipelineSpecification pipelineSpecification,
+ IPipelineConnections pipelineConnections,
File localFile,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(
- new PipelineConnections(pipelineSpecification));
+ PipelineObject pipeline = pipelineGrab(pipelineConnections);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -341,20 +338,19 @@ public class IncrementalIngester extends
/** Pre-determine whether a document's length is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that are too long to be indexable.
- *@param pipelineSpecification is the pipeline specification.
+ *@param pipelineConnections is the pipeline connections object for this pipeline.
*@param length is the length of the document.
*@param activity are the activities available to this method.
*@return true if the file is indexable.
*/
@Override
public boolean checkLengthIndexable(
- IPipelineSpecification pipelineSpecification,
+ IPipelineConnections pipelineConnections,
long length,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(
- new PipelineConnections(pipelineSpecification));
+ PipelineObject pipeline = pipelineGrab(pipelineConnections);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -370,20 +366,19 @@ public class IncrementalIngester extends
/** Pre-determine whether a document's URL is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that not indexable.
- *@param pipelineSpecification is the pipeline specification.
+ *@param pipelineConnections is the pipeline connections object for this pipeline.
*@param url is the url of the document.
*@param activity are the activities available to this method.
*@return true if the file is indexable.
*/
@Override
public boolean checkURLIndexable(
- IPipelineSpecification pipelineSpecification,
+ IPipelineConnections pipelineConnections,
String url,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption
{
- PipelineObject pipeline = pipelineGrab(
- new PipelineConnections(pipelineSpecification));
+ PipelineObject pipeline = pipelineGrab(pipelineConnections);
if (pipeline == null)
// A connector is not installed; treat this as a service interruption.
throw new ServiceInterruption("One or more connectors are not installed",0L);
@@ -404,7 +399,7 @@ public class IncrementalIngester extends
*@param outputDescriptionString - the output description string
*@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
*/
- protected PipelineObjectWithVersions pipelineGrabWithVersions(PipelineConnectionsWithVersions pipelineConnections)
+ protected PipelineObjectWithVersions pipelineGrabWithVersions(IPipelineConnectionsWithVersions pipelineConnections)
throws ManifoldCFException
{
// Pick up all needed transformation connectors
@@ -454,7 +449,7 @@ public class IncrementalIngester extends
*@param outputDescriptionString - the output description string
*@return the pipeline description, or null if any part of the pipeline cannot be grabbed.
*/
- protected PipelineObject pipelineGrab(PipelineConnections pipelineConnections)
+ protected PipelineObject pipelineGrab(IPipelineConnections pipelineConnections)
throws ManifoldCFException
{
// Pick up all needed transformation connectors
@@ -711,7 +706,7 @@ public class IncrementalIngester extends
* This method is conceptually similar to documentIngest(), but does not actually take
* a document or allow it to be transformed. If there is a document already
* indexed, it is removed from the index.
- *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+ *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param componentHash is the hashed component identifier, if any.
@@ -723,7 +718,7 @@ public class IncrementalIngester extends
*/
@Override
public void documentNoData(
- IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+ IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
String identifierClass, String identifierHash, String componentHash,
String documentVersion,
String parameterVersion,
@@ -732,13 +727,11 @@ public class IncrementalIngester extends
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption
{
- PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
-
String docKey = makeKey(identifierClass,identifierHash);
if (Logging.ingest.isDebugEnabled())
{
- Logging.ingest.debug("Logging empty document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+ Logging.ingest.debug("Logging empty document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineConnectionsWithVersions.getSpecification().getBasicPipelineSpecification())+"'");
}
// Set up a pipeline
@@ -761,7 +754,7 @@ public class IncrementalIngester extends
* method also REMOVES ALL OLD METADATA. When complete, the index will contain only the metadata
* described by the RepositoryDocument object passed to this method.
* ServiceInterruption is thrown if the document ingestion must be rescheduled.
- *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+ *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param componentHash is the hashed component identifier, if any.
@@ -777,7 +770,7 @@ public class IncrementalIngester extends
*/
@Override
public boolean documentIngest(
- IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+ IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
String identifierClass, String identifierHash, String componentHash,
String documentVersion,
String parameterVersion,
@@ -787,13 +780,11 @@ public class IncrementalIngester extends
IOutputActivity activities)
throws ManifoldCFException, ServiceInterruption, IOException
{
- PipelineConnectionsWithVersions pipelineConnectionsWithVersions = new PipelineConnectionsWithVersions(pipelineSpecificationWithVersions);
-
String docKey = makeKey(identifierClass,identifierHash);
if (Logging.ingest.isDebugEnabled())
{
- Logging.ingest.debug("Ingesting document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineSpecificationWithVersions.getPipelineSpecification().getBasicPipelineSpecification())+"'");
+ Logging.ingest.debug("Ingesting document '"+docKey+"' component hash "+((componentHash==null)?"(None)":("'"+componentHash+"'"))+" into output connections '"+extractOutputConnectionNames(pipelineConnectionsWithVersions.getSpecification().getBasicPipelineSpecification())+"'");
}
// Set indexing date
@@ -2607,12 +2598,12 @@ public class IncrementalIngester extends
protected class PipelineObject
{
- public final PipelineConnections pipelineConnections;
+ public final IPipelineConnections pipelineConnections;
public final IOutputConnector[] outputConnectors;
public final ITransformationConnector[] transformationConnectors;
public PipelineObject(
- PipelineConnections pipelineConnections,
+ IPipelineConnections pipelineConnections,
ITransformationConnector[] transformationConnectors,
IOutputConnector[] outputConnectors)
{
@@ -2738,10 +2729,10 @@ public class IncrementalIngester extends
protected class PipelineObjectWithVersions extends PipelineObject
{
- protected final PipelineConnectionsWithVersions pipelineConnectionsWithVersions;
+ protected final IPipelineConnectionsWithVersions pipelineConnectionsWithVersions;
public PipelineObjectWithVersions(
- PipelineConnectionsWithVersions pipelineConnectionsWithVersions,
+ IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
ITransformationConnector[] transformationConnectors,
IOutputConnector[] outputConnectors)
{
@@ -3700,127 +3691,6 @@ public class IncrementalIngester extends
}
- /** This class caches loaded connections corresponding to a pipeline specification.
- */
- protected class PipelineConnections
- {
- protected final IPipelineSpecification spec;
- protected final String[] transformationConnectionNames;
- protected final ITransformationConnection[] transformationConnections;
- protected final String[] outputConnectionNames;
- protected final IOutputConnection[] outputConnections;
- // We need a way to get from stage index to connection index.
- // These arrays are looked up by stage index, and return the appropriate connection index.
- protected final Map<Integer,Integer> transformationConnectionLookupMap = new HashMap<Integer,Integer>();
- protected final Map<Integer,Integer> outputConnectionLookupMap = new HashMap<Integer,Integer>();
-
- public PipelineConnections(IPipelineSpecification spec)
- throws ManifoldCFException
- {
- this.spec = spec;
- IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
- // Now, load all the connections we'll ever need, being sure to only load one copy of each.
- // We first segregate them into unique transformation and output connections.
- int count = basicSpec.getStageCount();
- Set<String> transformations = new HashSet<String>();
- Set<String> outputs = new HashSet<String>();
- for (int i = 0; i < count; i++)
- {
- if (basicSpec.checkStageOutputConnection(i))
- outputs.add(basicSpec.getStageConnectionName(i));
- else
- transformations.add(basicSpec.getStageConnectionName(i));
- }
-
- Map<String,Integer> transformationNameMap = new HashMap<String,Integer>();
- Map<String,Integer> outputNameMap = new HashMap<String,Integer>();
- transformationConnectionNames = new String[transformations.size()];
- outputConnectionNames = new String[outputs.size()];
- int index = 0;
- for (String connectionName : transformations)
- {
- transformationConnectionNames[index] = connectionName;
- transformationNameMap.put(connectionName,new Integer(index++));
- }
- index = 0;
- for (String connectionName : outputs)
- {
- outputConnectionNames[index] = connectionName;
- outputNameMap.put(connectionName,new Integer(index++));
- }
- // Load!
- transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
- outputConnections = connectionManager.loadMultiple(outputConnectionNames);
-
- for (int i = 0; i < count; i++)
- {
- Integer k;
- if (basicSpec.checkStageOutputConnection(i))
- {
- outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(basicSpec.getStageConnectionName(i)));
- }
- else
- {
- transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(basicSpec.getStageConnectionName(i)));
- }
- }
- }
-
- public IPipelineSpecification getSpecification()
- {
- return spec;
- }
-
- public String[] getTransformationConnectionNames()
- {
- return transformationConnectionNames;
- }
-
- public ITransformationConnection[] getTransformationConnections()
- {
- return transformationConnections;
- }
-
- public String[] getOutputConnectionNames()
- {
- return outputConnectionNames;
- }
-
- public IOutputConnection[] getOutputConnections()
- {
- return outputConnections;
- }
-
- public Integer getTransformationConnectionIndex(int stage)
- {
- return transformationConnectionLookupMap.get(new Integer(stage));
- }
-
- public Integer getOutputConnectionIndex(int stage)
- {
- return outputConnectionLookupMap.get(new Integer(stage));
- }
-
- }
-
- protected class PipelineConnectionsWithVersions extends PipelineConnections
- {
- protected final IPipelineSpecificationWithVersions pipelineSpecificationWithVersions;
-
- public PipelineConnectionsWithVersions(IPipelineSpecificationWithVersions pipelineSpecificationWithVersions)
- throws ManifoldCFException
- {
- super(pipelineSpecificationWithVersions.getPipelineSpecification());
- this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
- }
-
- public IPipelineSpecificationWithVersions getSpecificationWithVersions()
- {
- return pipelineSpecificationWithVersions;
- }
-
- }
-
/** This class passes everything through, and monitors what happens so that the
* framework can compensate for any transformation connector coding errors.
*/
Modified: manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1644200&r1=1644199&r2=1644200&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java (original)
+++ manifoldcf/branches/dev_1x/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java Tue Dec 9 21:51:46 2014
@@ -91,63 +91,63 @@ public interface IIncrementalIngester
throws ManifoldCFException, ServiceInterruption;
/** Check if a document date is indexable.
- *@param pipelineSpecification is the pipeline specification.
+ *@param pipelineConnections is the IPipelineConnections object for this pipeline.
*@param date is the date to check
*@param activity are the activities available to this method.
*@return true if the document with that date is indexable.
*/
public boolean checkDateIndexable(
- IPipelineSpecification pipelineSpecification,
+ IPipelineConnections pipelineConnections,
Date date,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption;
/** Check if a mime type is indexable.
- *@param pipelineSpecification is the pipeline specification.
+ *@param pipelineConnections is the pipeline connections object for this pipeline.
*@param mimeType is the mime type to check.
*@param activity are the activities available to this method.
*@return true if the mimeType is indexable.
*/
public boolean checkMimeTypeIndexable(
- IPipelineSpecification pipelineSpecification,
+ IPipelineConnections pipelineConnections,
String mimeType,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption;
/** Check if a file is indexable.
- *@param pipelineSpecification is the pipeline specification.
+ *@param pipelineConnections is the pipeline connections object for this pipeline.
*@param localFile is the local file to check.
*@param activity are the activities available to this method.
*@return true if the local file is indexable.
*/
public boolean checkDocumentIndexable(
- IPipelineSpecification pipelineSpecification,
+ IPipelineConnections pipelineConnections,
File localFile,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption;
/** Pre-determine whether a document's length is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that are too long to be indexable.
- *@param pipelineSpecification is the pipeline specification.
+ *@param pipelineConnections is the pipeline connections object for this pipeline.
*@param length is the length of the document.
*@param activity are the activities available to this method.
*@return true if the file is indexable.
*/
public boolean checkLengthIndexable(
- IPipelineSpecification pipelineSpecification,
+ IPipelineConnections pipelineConnections,
long length,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption;
/** Pre-determine whether a document's URL is indexable by this connector. This method is used by participating repository connectors
* to help filter out documents that not indexable.
- *@param pipelineSpecification is the pipeline specification.
+ *@param pipelineConnections is the pipeline connections object for this pipeline.
*@param url is the url of the document.
*@param activity are the activities available to this method.
*@return true if the file is indexable.
*/
public boolean checkURLIndexable(
- IPipelineSpecification pipelineSpecification,
+ IPipelineConnections pipelineConnections,
String url,
IOutputCheckActivity activity)
throws ManifoldCFException, ServiceInterruption;
@@ -188,7 +188,7 @@ public interface IIncrementalIngester
* This method is conceptually similar to documentIngest(), but does not actually take
* a document or allow it to be transformed. If there is a document already
* indexed, it is removed from the index.
- *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+ *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param componentHash is the hashed component identifier, if any.
@@ -199,7 +199,7 @@ public interface IIncrementalIngester
*@param activities is an object providing a set of methods that the implementer can use to perform the operation.
*/
public void documentNoData(
- IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+ IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
String identifierClass, String identifierHash, String componentHash,
String documentVersion,
String parameterVersion,
@@ -213,7 +213,7 @@ public interface IIncrementalIngester
* method also REMOVES ALL OLD METADATA. When complete, the index will contain only the metadata
* described by the RepositoryDocument object passed to this method.
* ServiceInterruption is thrown if the document ingestion must be rescheduled.
- *@param pipelineSpecificationWithVersions is the pipeline specification with already-fetched output versioning information.
+ *@param pipelineConnectionsWithVersions is the pipeline connections with already-fetched output versioning information.
*@param identifierClass is the name of the space in which the identifier hash should be interpreted.
*@param identifierHash is the hashed document identifier.
*@param componentHash is the hashed component identifier, if any.
@@ -228,7 +228,7 @@ public interface IIncrementalIngester
*@throws IOException only if data stream throws an IOException.
*/
public boolean documentIngest(
- IPipelineSpecificationWithVersions pipelineSpecificationWithVersions,
+ IPipelineConnectionsWithVersions pipelineConnectionsWithVersions,
String identifierClass, String identifierHash, String componentHash,
String documentVersion,
String parameterVersion,
Modified: manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java?rev=1644200&r1=1644199&r2=1644200&view=diff
==============================================================================
--- manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java (original)
+++ manifoldcf/branches/dev_1x/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java Tue Dec 9 21:51:46 2014
@@ -76,6 +76,8 @@ public class WorkerThread extends Thread
IJobManager jobManager = JobManagerFactory.make(threadContext);
IBinManager binManager = BinManagerFactory.make(threadContext);
IRepositoryConnectionManager connMgr = RepositoryConnectionManagerFactory.make(threadContext);
+ ITransformationConnectionManager transformationConnectionManager = TransformationConnectionManagerFactory.make(threadContext);
+ IOutputConnectionManager outputConnectionManager = OutputConnectionManagerFactory.make(threadContext);
IReprioritizationTracker rt = ReprioritizationTrackerFactory.make(threadContext);
IRepositoryConnectorPool repositoryConnectorPool = RepositoryConnectorPoolFactory.make(threadContext);
@@ -357,8 +359,8 @@ public class WorkerThread extends Thread
}
ProcessActivity activity = new ProcessActivity(job.getID(),processID,
- threadContext,rt,jobManager,ingester,
- connectionName,pipelineSpecification,
+ rt,jobManager,ingester,
+ connectionName,pipelineSpecification,transformationConnectionManager,outputConnectionManager,
previousDocuments,
currentTime,
job.getExpiration(),
@@ -1108,6 +1110,181 @@ public class WorkerThread extends Thread
// Nested classes
+ /** Pipeline connections implementation.
+ */
+ protected static class PipelineConnections implements IPipelineConnections
+ {
+ protected final IPipelineSpecification spec;
+ protected final String[] transformationConnectionNames;
+ protected final ITransformationConnection[] transformationConnections;
+ protected final String[] outputConnectionNames;
+ protected final IOutputConnection[] outputConnections;
+ // We need a way to get from stage index to connection index.
+ // These arrays are looked up by stage index, and return the appropriate connection index.
+ protected final Map<Integer,Integer> transformationConnectionLookupMap = new HashMap<Integer,Integer>();
+ protected final Map<Integer,Integer> outputConnectionLookupMap = new HashMap<Integer,Integer>();
+
+ public PipelineConnections(ITransformationConnectionManager transformationConnectionManager,
+ IOutputConnectionManager outputConnectionManager, IPipelineSpecification spec)
+ throws ManifoldCFException
+ {
+ this.spec = spec;
+ IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
+ // Now, load all the connections we'll ever need, being sure to only load one copy of each.
+ // We first segregate them into unique transformation and output connections.
+ int count = basicSpec.getStageCount();
+ Set<String> transformations = new HashSet<String>();
+ Set<String> outputs = new HashSet<String>();
+ for (int i = 0; i < count; i++)
+ {
+ if (basicSpec.checkStageOutputConnection(i))
+ outputs.add(basicSpec.getStageConnectionName(i));
+ else
+ transformations.add(basicSpec.getStageConnectionName(i));
+ }
+
+ Map<String,Integer> transformationNameMap = new HashMap<String,Integer>();
+ Map<String,Integer> outputNameMap = new HashMap<String,Integer>();
+ transformationConnectionNames = new String[transformations.size()];
+ outputConnectionNames = new String[outputs.size()];
+ int index = 0;
+ for (String connectionName : transformations)
+ {
+ transformationConnectionNames[index] = connectionName;
+ transformationNameMap.put(connectionName,new Integer(index++));
+ }
+ index = 0;
+ for (String connectionName : outputs)
+ {
+ outputConnectionNames[index] = connectionName;
+ outputNameMap.put(connectionName,new Integer(index++));
+ }
+ // Load!
+ transformationConnections = transformationConnectionManager.loadMultiple(transformationConnectionNames);
+ outputConnections = outputConnectionManager.loadMultiple(outputConnectionNames);
+
+ for (int i = 0; i < count; i++)
+ {
+ Integer k;
+ if (basicSpec.checkStageOutputConnection(i))
+ {
+ outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(basicSpec.getStageConnectionName(i)));
+ }
+ else
+ {
+ transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(basicSpec.getStageConnectionName(i)));
+ }
+ }
+ }
+
+ @Override
+ public IPipelineSpecification getSpecification()
+ {
+ return spec;
+ }
+
+ @Override
+ public String[] getTransformationConnectionNames()
+ {
+ return transformationConnectionNames;
+ }
+
+ @Override
+ public ITransformationConnection[] getTransformationConnections()
+ {
+ return transformationConnections;
+ }
+
+ @Override
+ public String[] getOutputConnectionNames()
+ {
+ return outputConnectionNames;
+ }
+
+ @Override
+ public IOutputConnection[] getOutputConnections()
+ {
+ return outputConnections;
+ }
+
+ @Override
+ public Integer getTransformationConnectionIndex(int stage)
+ {
+ return transformationConnectionLookupMap.get(new Integer(stage));
+ }
+
+ @Override
+ public Integer getOutputConnectionIndex(int stage)
+ {
+ return outputConnectionLookupMap.get(new Integer(stage));
+ }
+
+ }
+
+ /** IPipelineConnectionsWithVersions implementation.
+ */
+ protected static class PipelineConnectionsWithVersions implements IPipelineConnectionsWithVersions
+ {
+ protected final IPipelineConnections pipelineConnections;
+ protected final IPipelineSpecificationWithVersions pipelineSpecificationWithVersions;
+
+ public PipelineConnectionsWithVersions(IPipelineConnections pipelineConnections, IPipelineSpecificationWithVersions pipelineSpecificationWithVersions)
+ throws ManifoldCFException
+ {
+ this.pipelineConnections = pipelineConnections;
+ this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
+ }
+
+ @Override
+ public IPipelineSpecification getSpecification()
+ {
+ return pipelineConnections.getSpecification();
+ }
+
+ @Override
+ public String[] getTransformationConnectionNames()
+ {
+ return pipelineConnections.getTransformationConnectionNames();
+ }
+
+ @Override
+ public ITransformationConnection[] getTransformationConnections()
+ {
+ return pipelineConnections.getTransformationConnections();
+ }
+
+ @Override
+ public String[] getOutputConnectionNames()
+ {
+ return pipelineConnections.getOutputConnectionNames();
+ }
+
+ @Override
+ public IOutputConnection[] getOutputConnections()
+ {
+ return pipelineConnections.getOutputConnections();
+ }
+
+ @Override
+ public Integer getTransformationConnectionIndex(int stage)
+ {
+ return pipelineConnections.getTransformationConnectionIndex(stage);
+ }
+
+ @Override
+ public Integer getOutputConnectionIndex(int stage)
+ {
+ return pipelineConnections.getOutputConnectionIndex(stage);
+ }
+
+ @Override
+ public IPipelineSpecificationWithVersions getSpecificationWithVersions()
+ {
+ return pipelineSpecificationWithVersions;
+ }
+
+ }
+
/** Process activity class wraps access to the ingester and job queue.
*/
protected static class ProcessActivity implements IProcessActivity
@@ -1115,11 +1292,12 @@ public class WorkerThread extends Thread
// Member variables
protected final Long jobID;
protected final String processID;
- protected final IThreadContext threadContext;
protected final IJobManager jobManager;
protected final IIncrementalIngester ingester;
protected final String connectionName;
protected final IPipelineSpecification pipelineSpecification;
+ protected final ITransformationConnectionManager transformationConnectionManager;
+ protected final IOutputConnectionManager outputConnectionManager;
protected final Map<String,QueuedDocument> previousDocuments;
protected final long currentTime;
protected final Long expireInterval;
@@ -1134,6 +1312,9 @@ public class WorkerThread extends Thread
protected final OutputActivity ingestLogger;
protected final IReprioritizationTracker rt;
protected final String parameterVersion;
+
+ protected IPipelineConnections pipelineConnections = null;
+ protected IPipelineConnectionsWithVersions pipelineConnectionsWithVersions = null;
// We submit references in bulk, because that's way more efficient.
protected final Map<DocumentReference,DocumentReference> referenceList = new HashMap<DocumentReference,DocumentReference>();
@@ -1164,16 +1345,23 @@ public class WorkerThread extends Thread
// This represents primary documents.
protected final Set<String> touchedPrimarySet = new HashSet<String>();
+ protected IPipelineConnections getPipelineConnections()
+ throws ManifoldCFException
+ {
+ if (pipelineConnections == null)
+ pipelineConnections = new PipelineConnections(transformationConnectionManager,outputConnectionManager,pipelineSpecification);
+ return pipelineConnections;
+ }
+
/** Constructor.
*@param jobManager is the job manager
*@param ingester is the ingester
*/
public ProcessActivity(Long jobID, String processID,
- IThreadContext threadContext,
IReprioritizationTracker rt, IJobManager jobManager,
IIncrementalIngester ingester,
String connectionName,
- IPipelineSpecification pipelineSpecification,
+ IPipelineSpecification pipelineSpecification, ITransformationConnectionManager transformationConnectionManager, IOutputConnectionManager outputConnectionManager,
Map<String,QueuedDocument> previousDocuments,
long currentTime,
Long expireInterval,
@@ -1187,12 +1375,13 @@ public class WorkerThread extends Thread
{
this.jobID = jobID;
this.processID = processID;
- this.threadContext = threadContext;
this.rt = rt;
this.jobManager = jobManager;
this.ingester = ingester;
this.connectionName = connectionName;
this.pipelineSpecification = pipelineSpecification;
+ this.transformationConnectionManager = transformationConnectionManager;
+ this.outputConnectionManager = outputConnectionManager;
this.previousDocuments = previousDocuments;
this.currentTime = currentTime;
this.expireInterval = expireInterval;
@@ -1608,7 +1797,7 @@ public class WorkerThread extends Thread
// indicates that it should always be refetched. But I have no way to describe this situation
// in the database at the moment.
ingester.documentIngest(
- computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
+ new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
connectionName,documentIdentifierHash,componentIdentifierHash,
version,parameterVersion,
connection.getACLAuthority(),
@@ -1651,7 +1840,7 @@ public class WorkerThread extends Thread
checkMultipleDispositions(documentIdentifier,componentIdentifier,componentIdentifierHash);
ingester.documentNoData(
- computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier),
+ new PipelineConnectionsWithVersions(getPipelineConnections(),computePipelineSpecification(documentIdentifierHash,componentIdentifierHash,documentIdentifier)),
connectionName,documentIdentifierHash,componentIdentifierHash,
version,parameterVersion,
connection.getACLAuthority(),
@@ -1962,6 +2151,7 @@ public class WorkerThread extends Thread
long currentTime = System.currentTimeMillis();
+ double currentMinimumDepth = rt.getMinimumDepth();
rt.clearPreloadRequests();
for (int j = 0; j < docidHashes.length; j++)
{
@@ -1974,7 +2164,7 @@ public class WorkerThread extends Thread
// Calculate desired document priority based on current queuetracker status.
String[] bins = ManifoldCF.calculateBins(connector,dr.getLocalIdentifier());
- PriorityCalculator p = new PriorityCalculator(rt,connection,bins);
+ PriorityCalculator p = new PriorityCalculator(rt,currentMinimumDepth,connection,bins);
priorities[j] = p;
p.makePreloadRequest();
}
@@ -2054,7 +2244,7 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkDateIndexable(
- pipelineSpecification,date,
+ getPipelineConnections(),date,
ingestLogger);
}
@@ -2067,7 +2257,7 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkMimeTypeIndexable(
- pipelineSpecification,mimeType,
+ getPipelineConnections(),mimeType,
ingestLogger);
}
@@ -2080,7 +2270,7 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkDocumentIndexable(
- pipelineSpecification,localFile,
+ getPipelineConnections(),localFile,
ingestLogger);
}
@@ -2093,7 +2283,7 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkLengthIndexable(
- pipelineSpecification,length,
+ getPipelineConnections(),length,
ingestLogger);
}
@@ -2107,7 +2297,7 @@ public class WorkerThread extends Thread
throws ManifoldCFException, ServiceInterruption
{
return ingester.checkURLIndexable(
- pipelineSpecification,url,
+ getPipelineConnections(),url,
ingestLogger);
}