You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2014/06/14 20:14:43 UTC
svn commit: r1602613 [1/2] - in
/manifoldcf/branches/CONNECTORS-962/framework:
agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/
agents/src/main/java/org/apache/manifoldcf/agents/interfaces/
pull-agent/src/main/java/org/apache/manifo...
Author: kwright
Date: Sat Jun 14 18:14:42 2014
New Revision: 1602613
URL: http://svn.apache.org/r1602613
Log:
Get the code to build again
Added:
manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java (with props)
manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java (with props)
manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationBasic.java (with props)
manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecificationWithVersions.java (with props)
Modified:
manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java
manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java
manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java
manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentDescription.java
manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java
manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/QueuedDocument.java
manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/QueuedDocumentSet.java
manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/StufferThread.java
manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/incrementalingest/IncrementalIngester.java Sat Jun 14 18:14:42 2014
@@ -216,6 +216,21 @@ public class IncrementalIngester extends
performDelete("",null,null);
}
+ /** From a pipeline specification, get the name of the output connection that will be indexed last
+ * in the pipeline.
+ *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@return the last indexed output connection name.
+ */
+ @Override
+ public String getLastIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic)
+ {
+ // It's always the last in the sequence.
+ int count = pipelineSpecificationBasic.getOutputCount();
+ if (count == 0)
+ return null;
+ return pipelineSpecificationBasic.getStageConnectionName(count-1);
+ }
+
/** Check if a mime type is indexable.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param outputDescription is the output description string.
@@ -558,13 +573,15 @@ public class IncrementalIngester extends
String newParameterVersion,
String newAuthorityNameString)
{
+ IPipelineSpecification pipelineSpecification = pipelineSpecificationWithVersions.getPipelineSpecification();
+ IPipelineSpecificationBasic basicSpecification = pipelineSpecification.getBasicPipelineSpecification();
// Empty document version has a special meaning....
if (newDocumentVersion.length() == 0)
return true;
// Otherwise, cycle through the outputs
- for (int i = 0; i < pipelineSpecificationWithVersions.getOutputCount(); i++)
+ for (int i = 0; i < basicSpecification.getOutputCount(); i++)
{
- int stage = pipelineSpecificationWithVersions.getOutputStage(i);
+ int stage = basicSpecification.getOutputStage(i);
String oldDocumentVersion = pipelineSpecificationWithVersions.getOutputDocumentVersionString(i);
String oldParameterVersion = pipelineSpecificationWithVersions.getOutputParameterVersionString(i);
String oldOutputVersion = pipelineSpecificationWithVersions.getOutputVersionString(i);
@@ -576,11 +593,11 @@ public class IncrementalIngester extends
if (!oldDocumentVersion.equals(newDocumentVersion) ||
!oldParameterVersion.equals(newParameterVersion) ||
!oldAuthorityName.equals(newAuthorityNameString) ||
- !oldOutputVersion.equals(pipelineSpecificationWithVersions.getStageDescriptionString(stage)))
+ !oldOutputVersion.equals(pipelineSpecification.getStageDescriptionString(stage)))
return true;
// Everything matches so far. Next step is to compute a transformation path an corresponding version string.
- String newTransformationVersion = computePackedTransformationVersion(pipelineSpecificationWithVersions,stage);
+ String newTransformationVersion = computePackedTransformationVersion(pipelineSpecification,stage);
if (!pipelineSpecificationWithVersions.getOutputTransformationVersionString(i).equals(newTransformationVersion))
return true;
}
@@ -595,12 +612,13 @@ public class IncrementalIngester extends
*/
protected static String computePackedTransformationVersion(IPipelineSpecification pipelineSpecification, int stage)
{
+ IPipelineSpecificationBasic basicSpecification = pipelineSpecification.getBasicPipelineSpecification();
// First, count the stages we need to represent
int stageCount = 0;
int currentStage = stage;
while (true)
{
- int newStage = pipelineSpecification.getStageParent(currentStage);
+ int newStage = basicSpecification.getStageParent(currentStage);
if (newStage == -1)
break;
stageCount++;
@@ -613,10 +631,10 @@ public class IncrementalIngester extends
currentStage = stage;
while (true)
{
- int newStage = pipelineSpecification.getStageParent(currentStage);
+ int newStage = basicSpecification.getStageParent(currentStage);
if (newStage == -1)
break;
- stageNames[stageCount] = pipelineSpecification.getStageConnectionName(newStage);
+ stageNames[stageCount] = basicSpecification.getStageConnectionName(newStage);
stageDescriptions[stageCount] = pipelineSpecification.getStageDescriptionString(newStage);
stageCount++;
currentStage = newStage;
@@ -1585,22 +1603,20 @@ public class IncrementalIngester extends
throws ManifoldCFException
{
DeleteInfo[] rval = new DeleteInfo[identifierHashes.length];
- HashMap map = new HashMap();
- int i = 0;
- while (i < identifierHashes.length)
+ Map<String,Integer> map = new HashMap<String,Integer>();
+ for (int i = 0; i < identifierHashes.length; i++)
{
map.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
rval[i] = null;
- i++;
}
beginTransaction();
try
{
- ArrayList list = new ArrayList();
+ List<String> list = new ArrayList<String>();
int maxCount = maxClauseDocumentURIChunk(outputConnectionName);
int j = 0;
- Iterator iter = map.keySet().iterator();
+ Iterator<String> iter = map.keySet().iterator();
while (iter.hasNext())
{
if (j == maxCount)
@@ -1632,6 +1648,179 @@ public class IncrementalIngester extends
}
}
+ /** Look up ingestion data for a set of documents.
+ *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+ *@param pipelineSpecificationBasics are the pipeline specifications corresponding to the identifier classes and hashes.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is the array of document identifier hashes to look up.
+ */
+ @Override
+ public void getPipelineDocumentIngestDataMultiple(
+ Map<OutputKey,DocumentIngestStatus> rval,
+ IPipelineSpecificationBasic[] pipelineSpecificationBasics,
+ String[] identifierClasses, String[] identifierHashes)
+ throws ManifoldCFException
+ {
+ // Organize by pipeline spec.
+ Map<IPipelineSpecificationBasic,List<Integer>> keyMap = new HashMap<IPipelineSpecificationBasic,List<Integer>>();
+ for (int i = 0; i < pipelineSpecificationBasics.length; i++)
+ {
+ IPipelineSpecificationBasic spec = pipelineSpecificationBasics[i];
+ List<Integer> list = keyMap.get(spec);
+ if (list == null)
+ {
+ list = new ArrayList<Integer>();
+ keyMap.put(spec,list);
+ }
+ list.add(new Integer(i));
+ }
+
+ // Create the return array.
+ Iterator<IPipelineSpecificationBasic> iter = keyMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ IPipelineSpecificationBasic spec = iter.next();
+ List<Integer> list = keyMap.get(spec);
+ String[] localIdentifierClasses = new String[list.size()];
+ String[] localIdentifierHashes = new String[list.size()];
+ for (int i = 0; i < localIdentifierClasses.length; i++)
+ {
+ int index = list.get(i).intValue();
+ localIdentifierClasses[i] = identifierClasses[index];
+ localIdentifierHashes[i] = identifierHashes[index];
+ }
+ getPipelineDocumentIngestDataMultiple(rval,spec,localIdentifierClasses,localIdentifierHashes);
+ }
+ }
+
+ /** Look up ingestion data for a SET of documents.
+ *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+ *@param pipelineSpecificationBasic is the pipeline specification for all documents.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is the array of document identifier hashes to look up.
+ */
+ @Override
+ public void getPipelineDocumentIngestDataMultiple(
+ Map<OutputKey,DocumentIngestStatus> rval,
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String[] identifierClasses, String[] identifierHashes)
+ throws ManifoldCFException
+ {
+ String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+
+ // Build a map, so we can convert an identifier into an array index.
+ Map<String,Integer> indexMap = new HashMap<String,Integer>();
+ for (int i = 0; i < identifierHashes.length; i++)
+ {
+ indexMap.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
+ }
+
+ beginTransaction();
+ try
+ {
+ List<String> list = new ArrayList<String>();
+ int maxCount = maxClausePipelineDocumentIngestDataChunk(outputConnectionNames);
+ int j = 0;
+ Iterator<String> iter = indexMap.keySet().iterator();
+ while (iter.hasNext())
+ {
+ if (j == maxCount)
+ {
+ getPipelineDocumentIngestDataChunk(rval,indexMap,outputConnectionNames,list,identifierClasses,identifierHashes);
+ j = 0;
+ list.clear();
+ }
+ list.add(iter.next());
+ j++;
+ }
+ if (j > 0)
+ getPipelineDocumentIngestDataChunk(rval,indexMap,outputConnectionNames,list,identifierClasses,identifierHashes);
+ }
+ catch (ManifoldCFException e)
+ {
+ signalRollback();
+ throw e;
+ }
+ catch (Error e)
+ {
+ signalRollback();
+ throw e;
+ }
+ finally
+ {
+ endTransaction();
+ }
+
+ }
+
+ /** Get a chunk of document ingest data records.
+ *@param rval is the document ingest status array where the data should be put.
+ *@param map is the map from id to index.
+ *@param clause is the in clause for the query.
+ *@param list is the parameter list for the query.
+ */
+ protected void getPipelineDocumentIngestDataChunk(Map<OutputKey,DocumentIngestStatus> rval, Map<String,Integer> map, String[] outputConnectionNames, List<String> list,
+ String[] identifierClasses, String[] identifierHashes)
+ throws ManifoldCFException
+ {
+ ArrayList newList = new ArrayList();
+ String query = buildConjunctionClause(newList,new ClauseDescription[]{
+ new MultiClause(docKeyField,list),
+ new MultiClause(outputConnNameField,outputConnectionNames)});
+
+ // Get the primary records associated with this hash value
+ IResultSet set = performQuery("SELECT "+idField+","+outputConnNameField+","+docKeyField+","+lastVersionField+","+lastOutputVersionField+","+authorityNameField+","+forcedParamsField+
+ " FROM "+getTableName()+" WHERE "+query,newList,null,null);
+
+ // Now, go through the original request once more, this time building the result
+ for (int i = 0; i < set.getRowCount(); i++)
+ {
+ IResultRow row = set.getRow(i);
+ String docHash = row.getValue(docKeyField).toString();
+ Integer position = map.get(docHash);
+ if (position != null)
+ {
+ Long id = (Long)row.getValue(idField);
+ String outputConnectionName = (String)row.getValue(outputConnNameField);
+ String lastVersion = (String)row.getValue(lastVersionField);
+ if (lastVersion == null)
+ lastVersion = "";
+ String lastTransformationVersion = (String)row.getValue(lastTransformationVersionField);
+ if (lastTransformationVersion == null)
+ lastTransformationVersion = "";
+ String lastOutputVersion = (String)row.getValue(lastOutputVersionField);
+ if (lastOutputVersion == null)
+ lastOutputVersion = "";
+ String paramVersion = (String)row.getValue(forcedParamsField);
+ if (paramVersion == null)
+ paramVersion = "";
+ String authorityName = (String)row.getValue(authorityNameField);
+ if (authorityName == null)
+ authorityName = "";
+ int indexValue = position.intValue();
+ rval.put(new OutputKey(identifierClasses[indexValue],identifierHashes[indexValue],outputConnectionName),
+ new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName));
+ }
+ }
+ }
+
+ /** Look up ingestion data for a document.
+ *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+ *@param pipelineSpecificationBasic is the pipeline specification for the document.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hash of the id of the document.
+ */
+ @Override
+ public void getPipelineDocumentIngestData(
+ Map<OutputKey,DocumentIngestStatus> rval,
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String identifierClass, String identifierHash)
+ throws ManifoldCFException
+ {
+ getPipelineDocumentIngestDataMultiple(rval,pipelineSpecificationBasic,
+ new String[]{identifierClass},new String[]{identifierHash});
+ }
+
/** Look up ingestion data for a SET of documents.
*@param outputConnectionNames are the names of the output connections associated with this action.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
@@ -1640,50 +1829,45 @@ public class IncrementalIngester extends
* exist in the index.
*/
@Override
+ @Deprecated
public DocumentIngestStatus[] getDocumentIngestDataMultiple(String[] outputConnectionNames,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
{
// Segregate request by connection names
- HashMap keyMap = new HashMap();
- int i = 0;
- while (i < outputConnectionNames.length)
+ Map<String,List<Integer>> keyMap = new HashMap<String,List<Integer>>();
+ for (int i = 0; i < outputConnectionNames.length; i++)
{
String outputConnectionName = outputConnectionNames[i];
- ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+ List<Integer> list = keyMap.get(outputConnectionName);
if (list == null)
{
- list = new ArrayList();
+ list = new ArrayList<Integer>();
keyMap.put(outputConnectionName,list);
}
list.add(new Integer(i));
- i++;
}
// Create the return array.
DocumentIngestStatus[] rval = new DocumentIngestStatus[outputConnectionNames.length];
- Iterator iter = keyMap.keySet().iterator();
+ Iterator<String> iter = keyMap.keySet().iterator();
while (iter.hasNext())
{
- String outputConnectionName = (String)iter.next();
- ArrayList list = (ArrayList)keyMap.get(outputConnectionName);
+ String outputConnectionName = iter.next();
+ List<Integer> list = keyMap.get(outputConnectionName);
String[] localIdentifierClasses = new String[list.size()];
String[] localIdentifierHashes = new String[list.size()];
- i = 0;
- while (i < localIdentifierClasses.length)
+ for (int i = 0; i < localIdentifierClasses.length; i++)
{
- int index = ((Integer)list.get(i)).intValue();
+ int index = list.get(i).intValue();
localIdentifierClasses[i] = identifierClasses[index];
localIdentifierHashes[i] = identifierHashes[index];
- i++;
}
DocumentIngestStatus[] localRval = getDocumentIngestDataMultiple(outputConnectionName,localIdentifierClasses,localIdentifierHashes);
- i = 0;
- while (i < localRval.length)
+ for (int i = 0; i < localRval.length; i++)
{
- int index = ((Integer)list.get(i)).intValue();
+ int index = list.get(i).intValue();
rval[index] = localRval[i];
- i++;
}
}
return rval;
@@ -1697,6 +1881,7 @@ public class IncrementalIngester extends
* exist in the index.
*/
@Override
+ @Deprecated
public DocumentIngestStatus[] getDocumentIngestDataMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
@@ -1705,22 +1890,20 @@ public class IncrementalIngester extends
DocumentIngestStatus[] rval = new DocumentIngestStatus[identifierHashes.length];
// Build a map, so we can convert an identifier into an array index.
- HashMap indexMap = new HashMap();
- int i = 0;
- while (i < identifierHashes.length)
+ Map<String,Integer> indexMap = new HashMap<String,Integer>();
+ for (int i = 0; i < identifierHashes.length; i++)
{
indexMap.put(makeKey(identifierClasses[i],identifierHashes[i]),new Integer(i));
rval[i] = null;
- i++;
}
beginTransaction();
try
{
- ArrayList list = new ArrayList();
+ List<String> list = new ArrayList<String>();
int maxCount = maxClauseDocumentIngestDataChunk(outputConnectionName);
int j = 0;
- Iterator iter = indexMap.keySet().iterator();
+ Iterator<String> iter = indexMap.keySet().iterator();
while (iter.hasNext())
{
if (j == maxCount)
@@ -1759,6 +1942,7 @@ public class IncrementalIngester extends
*@return the current document's ingestion data, or null if the document is not currently ingested.
*/
@Override
+ @Deprecated
public DocumentIngestStatus getDocumentIngestData(String outputConnectionName,
String identifierClass, String identifierHash)
throws ManifoldCFException
@@ -1768,60 +1952,46 @@ public class IncrementalIngester extends
/** Calculate the average time interval between changes for a document.
* This is based on the data gathered for the document.
- *@param outputConnectionName is the name of the output connection associated with this action.
- *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
- *@param identifierHash is the hash of the id of the document.
- *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
- */
- @Override
- public long getDocumentUpdateInterval(String outputConnectionName,
- String identifierClass, String identifierHash)
- throws ManifoldCFException
- {
- return getDocumentUpdateIntervalMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
- }
-
- /** Calculate the average time interval between changes for a document.
- * This is based on the data gathered for the document.
- *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param pipelineSpecificationBasic is the basic pipeline specification.
*@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
*@param identifierHashes is the hashes of the ids of the documents.
*@return the number of milliseconds between changes, or 0 if this cannot be calculated.
*/
- @Override
- public long[] getDocumentUpdateIntervalMultiple(String outputConnectionName,
+ public long[] getDocumentUpdateIntervalMultiple(
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException
{
+ // Get the output connection names
+ String[] outputConnectionNames = extractOutputConnectionNames(pipelineSpecificationBasic);
+
// Do these all at once!!
// First, create a return array
long[] rval = new long[identifierHashes.length];
// Also create a map from identifier to return index.
- HashMap returnMap = new HashMap();
+ Map<String,Integer> returnMap = new HashMap<String,Integer>();
// Finally, need the set of hash codes
- HashMap idCodes = new HashMap();
- int j = 0;
- while (j < identifierHashes.length)
+ Set<String> idCodes = new HashSet<String>();
+ for (int j = 0; j < identifierHashes.length; j++)
{
String key = makeKey(identifierClasses[j],identifierHashes[j]);
- rval[j] = 0L;
+ rval[j] = Long.MAX_VALUE;
returnMap.put(key,new Integer(j));
- idCodes.put(key,key);
- j++;
+ idCodes.add(key);
}
// Get the chunk size
- int maxClause = maxClauseGetIntervals(outputConnectionName);
+ int maxClause = maxClauseGetIntervals(outputConnectionNames);
// Loop through the hash codes
- Iterator iter = idCodes.keySet().iterator();
- ArrayList list = new ArrayList();
- j = 0;
+ Iterator<String> iter = idCodes.iterator();
+ List<String> list = new ArrayList<String>();
+ int j = 0;
while (iter.hasNext())
{
if (j == maxClause)
{
- getIntervals(rval,outputConnectionName,list,returnMap);
+ getIntervals(rval,outputConnectionNames,list,returnMap);
list.clear();
j = 0;
}
@@ -1831,40 +2001,97 @@ public class IncrementalIngester extends
}
if (j > 0)
- getIntervals(rval,outputConnectionName,list,returnMap);
+ getIntervals(rval,outputConnectionNames,list,returnMap);
+ for (int i = 0; i < rval.length; i++)
+ {
+ if (rval[i] == Long.MAX_VALUE)
+ rval[i] = 0;
+ }
+
return rval;
+
}
+ /** Calculate the average time interval between changes for a document.
+ * This is based on the data gathered for the document.
+ *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hash of the id of the document.
+ *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+ */
+ @Override
+ public long getDocumentUpdateInterval(
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String identifierClass, String identifierHash)
+ throws ManifoldCFException
+ {
+ return getDocumentUpdateIntervalMultiple(
+ pipelineSpecificationBasic,
+ new String[]{identifierClass},new String[]{identifierHash})[0];
+ }
+
+ /** Calculate the average time interval between changes for a document.
+ * This is based on the data gathered for the document.
+ *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hash of the id of the document.
+ *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+ */
+ @Override
+ @Deprecated
+ public long getDocumentUpdateInterval(String outputConnectionName,
+ String identifierClass, String identifierHash)
+ throws ManifoldCFException
+ {
+ return getDocumentUpdateIntervalMultiple(outputConnectionName,new String[]{identifierClass},new String[]{identifierHash})[0];
+ }
+
+ /** Calculate the average time interval between changes for a document.
+ * This is based on the data gathered for the document.
+ *@param outputConnectionName is the name of the output connection associated with this action.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is the hashes of the ids of the documents.
+ *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+ */
+ @Override
+ @Deprecated
+ public long[] getDocumentUpdateIntervalMultiple(String outputConnectionName,
+ String[] identifierClasses, String[] identifierHashes)
+ throws ManifoldCFException
+ {
+ return getDocumentUpdateIntervalMultiple(new RuntPipelineSpecificationBasic(outputConnectionName),
+ identifierClasses,identifierHashes);
+ }
+
/** Calculate the number of clauses.
*/
- protected int maxClauseGetIntervals(String outputConnectionName)
+ protected int maxClauseGetIntervals(String[] outputConnectionNames)
{
return findConjunctionClauseMax(new ClauseDescription[]{
- new UnitaryClause(outputConnNameField,outputConnectionName)});
- }
-
+ new MultiClause(outputConnNameField,outputConnectionNames)});
+ }
+
/** Query for and calculate the interval for a bunch of hashcodes.
*@param rval is the array to stuff calculated return values into.
*@param list is the list of parameters.
*@param queryPart is the part of the query pertaining to the list of hashcodes
*@param returnMap is a mapping from document id to rval index.
*/
- protected void getIntervals(long[] rval, String outputConnectionName, ArrayList list, HashMap returnMap)
+ protected void getIntervals(long[] rval, String[] outputConnectionNames, List<String> list, Map<String,Integer> returnMap)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
String query = buildConjunctionClause(newList,new ClauseDescription[]{
new MultiClause(docKeyField,list),
- new UnitaryClause(outputConnNameField,outputConnectionName)});
+ new MultiClause(outputConnNameField,outputConnectionNames)});
IResultSet set = performQuery("SELECT "+docKeyField+","+changeCountField+","+firstIngestField+","+lastIngestField+
" FROM "+getTableName()+" WHERE "+query,newList,null,null);
- int i = 0;
- while (i < set.getRowCount())
+ for (int i = 0; i < set.getRowCount(); i++)
{
- IResultRow row = set.getRow(i++);
+ IResultRow row = set.getRow(i);
String docHash = (String)row.getValue(docKeyField);
Integer index = (Integer)returnMap.get(docHash);
if (index != null)
@@ -1873,7 +2100,10 @@ public class IncrementalIngester extends
long changeCount = ((Long)row.getValue(changeCountField)).longValue();
long firstIngest = ((Long)row.getValue(firstIngestField)).longValue();
long lastIngest = ((Long)row.getValue(lastIngestField)).longValue();
- rval[index.intValue()] = (long)(((double)(lastIngest-firstIngest))/(double)changeCount);
+ int indexValue = index.intValue();
+ long newValue = (long)(((double)(lastIngest-firstIngest))/(double)changeCount);
+ if (newValue < rval[indexValue])
+ rval[indexValue] = newValue;
}
}
}
@@ -2115,7 +2345,7 @@ public class IncrementalIngester extends
*@param clause is the in clause for the query.
*@param list is the parameter list for the query.
*/
- protected void getDocumentURIChunk(DeleteInfo[] rval, Map map, String outputConnectionName, ArrayList list)
+ protected void getDocumentURIChunk(DeleteInfo[] rval, Map<String,Integer> map, String outputConnectionName, List<String> list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
@@ -2127,10 +2357,9 @@ public class IncrementalIngester extends
query,newList,null,null);
// Go through list and put into buckets.
- int i = 0;
- while (i < set.getRowCount())
+ for (int i = 0; i < set.getRowCount(); i++)
{
- IResultRow row = set.getRow(i++);
+ IResultRow row = set.getRow(i);
String docHash = row.getValue(docKeyField).toString();
Integer position = (Integer)map.get(docHash);
if (position != null)
@@ -2151,6 +2380,14 @@ public class IncrementalIngester extends
return findConjunctionClauseMax(new ClauseDescription[]{
new UnitaryClause(outputConnNameField,outputConnectionName)});
}
+
+ /** Count the clauses
+ */
+ protected int maxClausePipelineDocumentIngestDataChunk(String[] outputConnectionNames)
+ {
+ return findConjunctionClauseMax(new ClauseDescription[]{
+ new MultiClause(outputConnNameField,outputConnectionNames)});
+ }
/** Get a chunk of document ingest data records.
*@param rval is the document ingest status array where the data should be put.
@@ -2158,7 +2395,7 @@ public class IncrementalIngester extends
*@param clause is the in clause for the query.
*@param list is the parameter list for the query.
*/
- protected void getDocumentIngestDataChunk(DocumentIngestStatus[] rval, Map map, String outputConnectionName, ArrayList list)
+ protected void getDocumentIngestDataChunk(DocumentIngestStatus[] rval, Map<String,Integer> map, String outputConnectionName, List<String> list)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
@@ -2171,12 +2408,11 @@ public class IncrementalIngester extends
" FROM "+getTableName()+" WHERE "+query,newList,null,null);
// Now, go through the original request once more, this time building the result
- int i = 0;
- while (i < set.getRowCount())
+ for (int i = 0; i < set.getRowCount(); i++)
{
- IResultRow row = set.getRow(i++);
+ IResultRow row = set.getRow(i);
String docHash = row.getValue(docKeyField).toString();
- Integer position = (Integer)map.get(docHash);
+ Integer position = map.get(docHash);
if (position != null)
{
Long id = (Long)row.getValue(idField);
@@ -2195,7 +2431,9 @@ public class IncrementalIngester extends
String authorityName = (String)row.getValue(authorityNameField);
if (authorityName == null)
authorityName = "";
- rval[position.intValue()] = new DocumentIngestStatus(lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName);
+ int indexValue = position.intValue();
+ rval[indexValue] = new DocumentIngestStatus(
+ lastVersion,lastTransformationVersion,lastOutputVersion,paramVersion,authorityName);
}
}
}
@@ -2502,10 +2740,11 @@ public class IncrementalIngester extends
Map<Integer,PipelineCheckEntryPoint> currentSet = new HashMap<Integer,PipelineCheckEntryPoint>();
// First, locate all the output stages, and enter them into the set
IPipelineSpecification spec = pipelineConnections.getSpecification();
- int count = spec.getOutputCount();
+ IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
+ int count = basicSpec.getOutputCount();
for (int i = 0; i < count; i++)
{
- int outputStage = spec.getOutputStage(i);
+ int outputStage = basicSpec.getOutputStage(i);
PipelineCheckEntryPoint outputStageEntryPoint = new PipelineCheckEntryPoint(
outputConnectors[pipelineConnections.getOutputConnectionIndex(outputStage).intValue()],
spec.getStageDescriptionString(outputStage),finalActivity);
@@ -2518,9 +2757,9 @@ public class IncrementalIngester extends
int[] siblings = null;
for (Integer outputStage : currentSet.keySet())
{
- parent = spec.getStageParent(outputStage.intValue());
+ parent = basicSpec.getStageParent(outputStage.intValue());
// Look up the children
- siblings = spec.getStageChildren(parent);
+ siblings = basicSpec.getStageChildren(parent);
// Are all the siblings in the current set yet? If not, we can't proceed with this entry.
boolean skipToNext = false;
for (int sibling : siblings)
@@ -2595,18 +2834,21 @@ public class IncrementalIngester extends
// Create the current set
Map<Integer,PipelineAddEntryPoint> currentSet = new HashMap<Integer,PipelineAddEntryPoint>();
// First, locate all the output stages, and enter them into the set
- IPipelineSpecificationWithVersions spec = pipelineConnectionsWithVersions.getSpecificationWithVersions();
- int outputCount = spec.getOutputCount();
+ IPipelineSpecificationWithVersions fullSpec = pipelineConnectionsWithVersions.getSpecificationWithVersions();
+ IPipelineSpecification pipelineSpec = fullSpec.getPipelineSpecification();
+ IPipelineSpecificationBasic basicSpec = pipelineSpec.getBasicPipelineSpecification();
+
+ int outputCount = basicSpec.getOutputCount();
for (int i = 0; i < outputCount; i++)
{
- int outputStage = spec.getOutputStage(i);
+ int outputStage = basicSpec.getOutputStage(i);
// Compute whether we need to reindex this record to this output or not, based on spec.
- String oldDocumentVersion = spec.getOutputDocumentVersionString(i);
- String oldParameterVersion = spec.getOutputParameterVersionString(i);
- String oldOutputVersion = spec.getOutputVersionString(i);
- String oldTransformationVersion = spec.getOutputTransformationVersionString(i);
- String oldAuthorityName = spec.getAuthorityNameString(i);
+ String oldDocumentVersion = fullSpec.getOutputDocumentVersionString(i);
+ String oldParameterVersion = fullSpec.getOutputParameterVersionString(i);
+ String oldOutputVersion = fullSpec.getOutputVersionString(i);
+ String oldTransformationVersion = fullSpec.getOutputTransformationVersionString(i);
+ String oldAuthorityName = fullSpec.getAuthorityNameString(i);
String newTransformationVersion = null;
@@ -2615,23 +2857,23 @@ public class IncrementalIngester extends
{
needToReindex = (!oldDocumentVersion.equals(newDocumentVersion) ||
!oldParameterVersion.equals(newParameterVersion) ||
- !oldOutputVersion.equals(spec.getStageDescriptionString(outputStage)) ||
+ !oldOutputVersion.equals(pipelineSpec.getStageDescriptionString(outputStage)) ||
!oldAuthorityName.equals(newAuthorityNameString));
}
if (needToReindex == false)
{
// Compute the transformation version string
- newTransformationVersion = computePackedTransformationVersion(spec,outputStage);
+ newTransformationVersion = computePackedTransformationVersion(pipelineSpec,outputStage);
needToReindex = (!oldTransformationVersion.equals(newTransformationVersion));
}
int connectionIndex = pipelineConnectionsWithVersions.getOutputConnectionIndex(outputStage).intValue();
PipelineAddEntryPoint outputStageEntryPoint = new OutputAddEntryPoint(
outputConnectors[connectionIndex],
- spec.getStageDescriptionString(outputStage),
- new OutputActivitiesWrapper(finalActivity,spec.getStageConnectionName(outputStage)),
+ pipelineSpec.getStageDescriptionString(outputStage),
+ new OutputActivitiesWrapper(finalActivity,basicSpec.getStageConnectionName(outputStage)),
needToReindex,
- spec.getStageConnectionName(outputStage),
+ basicSpec.getStageConnectionName(outputStage),
newTransformationVersion,
ingestTime,
newDocumentVersion,
@@ -2646,9 +2888,9 @@ public class IncrementalIngester extends
int[] siblings = null;
for (Integer outputStage : currentSet.keySet())
{
- parent = spec.getStageParent(outputStage.intValue());
+ parent = basicSpec.getStageParent(outputStage.intValue());
// Look up the children
- siblings = spec.getStageChildren(parent);
+ siblings = basicSpec.getStageChildren(parent);
// Are all the siblings in the current set yet? If not, we can't proceed with this entry.
boolean skipToNext = false;
for (int sibling : siblings)
@@ -2680,13 +2922,13 @@ public class IncrementalIngester extends
// Wrap the entry points in a fan-out class, which has pipe connector-like methods that fire across all the connectors.
PipelineAddFanout pcf = new PipelineAddFanout(siblingEntryPoints,
(parent==-1)?null:new TransformationRecordingActivity(finalActivity,
- spec.getStageConnectionName(parent)),
+ basicSpec.getStageConnectionName(parent)),
finalActivity);
if (parent == -1)
return pcf;
PipelineAddEntryPoint newEntry = new PipelineAddEntryPoint(
transformationConnectors[pipelineConnections.getTransformationConnectionIndex(parent).intValue()],
- spec.getStageDescriptionString(parent),pcf,pcf.checkNeedToReindex());
+ pipelineSpec.getStageDescriptionString(parent),pcf,pcf.checkNeedToReindex());
currentSet.put(new Integer(parent), newEntry);
}
@@ -3277,7 +3519,16 @@ public class IncrementalIngester extends
super(outputConnectionName);
this.outputDescriptionString = outputDescriptionString;
}
-
+
+ /** Get the basic pipeline specification.
+ *@return the specification.
+ */
+ @Override
+ public IPipelineSpecificationBasic getBasicPipelineSpecification()
+ {
+ return this;
+ }
+
/** Get the description string for a pipeline stage.
*@param stage is the stage to get the connection name for.
*@return the description string that stage.
@@ -3312,11 +3563,21 @@ public class IncrementalIngester extends
this.oldTransformationVersion = oldTransformationVersion;
this.oldAuthorityNameString = oldAuthorityNameString;
}
-
+
+ /** Get pipeline specification.
+ *@return the pipeline specification.
+ */
+ @Override
+ public IPipelineSpecification getPipelineSpecification()
+ {
+ return this;
+ }
+
/** For a given output index, return a document version string.
*@param index is the output index.
*@return the document version string.
*/
+ @Override
public String getOutputDocumentVersionString(int index)
{
if (index == 0)
@@ -3328,6 +3589,7 @@ public class IncrementalIngester extends
*@param index is the output index.
*@return the parameter version string.
*/
+ @Override
public String getOutputParameterVersionString(int index)
{
if (index == 0)
@@ -3339,6 +3601,7 @@ public class IncrementalIngester extends
*@param index is the output index.
*@return the transformation version string.
*/
+ @Override
public String getOutputTransformationVersionString(int index)
{
if (index == 0)
@@ -3350,6 +3613,7 @@ public class IncrementalIngester extends
*@param index is the output index.
*@return the output version string.
*/
+ @Override
public String getOutputVersionString(int index)
{
if (index == 0)
@@ -3361,6 +3625,7 @@ public class IncrementalIngester extends
*@param index is the output index.
*@return the authority name string.
*/
+ @Override
public String getAuthorityNameString(int index)
{
if (index == 0)
@@ -3388,17 +3653,18 @@ public class IncrementalIngester extends
throws ManifoldCFException
{
this.spec = spec;
+ IPipelineSpecificationBasic basicSpec = spec.getBasicPipelineSpecification();
// Now, load all the connections we'll ever need, being sure to only load one copy of each.
// We first segregate them into unique transformation and output connections.
- int count = spec.getStageCount();
+ int count = basicSpec.getStageCount();
Set<String> transformations = new HashSet<String>();
Set<String> outputs = new HashSet<String>();
for (int i = 0; i < count; i++)
{
- if (spec.checkStageOutputConnection(i))
- outputs.add(spec.getStageConnectionName(i));
+ if (basicSpec.checkStageOutputConnection(i))
+ outputs.add(basicSpec.getStageConnectionName(i));
else
- transformations.add(spec.getStageConnectionName(i));
+ transformations.add(basicSpec.getStageConnectionName(i));
}
Map<String,Integer> transformationNameMap = new HashMap<String,Integer>();
@@ -3424,13 +3690,13 @@ public class IncrementalIngester extends
for (int i = 0; i < count; i++)
{
Integer k;
- if (spec.checkStageOutputConnection(i))
+ if (basicSpec.checkStageOutputConnection(i))
{
- outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(spec.getStageConnectionName(i)));
+ outputConnectionLookupMap.put(new Integer(i),outputNameMap.get(basicSpec.getStageConnectionName(i)));
}
else
{
- transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(spec.getStageConnectionName(i)));
+ transformationConnectionLookupMap.put(new Integer(i),transformationNameMap.get(basicSpec.getStageConnectionName(i)));
}
}
}
@@ -3479,7 +3745,7 @@ public class IncrementalIngester extends
public PipelineConnectionsWithVersions(IPipelineSpecificationWithVersions pipelineSpecificationWithVersions)
throws ManifoldCFException
{
- super(pipelineSpecificationWithVersions);
+ super(pipelineSpecificationWithVersions.getPipelineSpecification());
this.pipelineSpecificationWithVersions = pipelineSpecificationWithVersions;
}
Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/DocumentIngestStatus.java Sat Jun 14 18:14:42 2014
@@ -38,10 +38,12 @@ public class DocumentIngestStatus
protected final String documentAuthorityNameString;
/** Constructor */
- public DocumentIngestStatus(String documentVersionString,
+ public DocumentIngestStatus(
+ String documentVersionString,
String transformationVersionString, String outputVersionString, String parameterVersionString,
String documentAuthorityNameString)
{
+ // Looked-up information
this.documentVersionString = documentVersionString;
this.transformationVersionString = transformationVersionString;
this.outputVersionString = outputVersionString;
Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IIncrementalIngester.java Sat Jun 14 18:14:42 2014
@@ -60,6 +60,13 @@ public interface IIncrementalIngester
throws ManifoldCFException;
+ /** From a pipeline specification, get the name of the output connection that will be indexed last
+ * in the pipeline.
+ *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@return the last indexed output connection name.
+ */
+ public String getLastIndexedOutputConnectionName(IPipelineSpecificationBasic pipelineSpecificationBasic);
+
/** Get an output version string for a document.
*@param outputConnectionName is the name of the output connection associated with this action.
*@param spec is the output specification.
@@ -432,6 +439,7 @@ public interface IIncrementalIngester
*@return the array of document data. Null will come back for any identifier that doesn't
* exist in the index.
*/
+ @Deprecated
public DocumentIngestStatus[] getDocumentIngestDataMultiple(String[] outputConnectionNames,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException;
@@ -443,6 +451,7 @@ public interface IIncrementalIngester
*@return the array of document data. Null will come back for any identifier that doesn't
* exist in the index.
*/
+ @Deprecated
public DocumentIngestStatus[] getDocumentIngestDataMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException;
@@ -453,10 +462,47 @@ public interface IIncrementalIngester
*@param identifierHash is the hash of the id of the document.
*@return the current document's ingestion data, or null if the document is not currently ingested.
*/
+ @Deprecated
public DocumentIngestStatus getDocumentIngestData(String outputConnectionName,
String identifierClass, String identifierHash)
throws ManifoldCFException;
+ /** Look up ingestion data for a set of documents.
+ *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+ *@param pipelineSpecificationBasics are the pipeline specifications corresponding to the identifier classes and hashes.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is the array of document identifier hashes to look up.
+ */
+ public void getPipelineDocumentIngestDataMultiple(
+ Map<OutputKey,DocumentIngestStatus> rval,
+ IPipelineSpecificationBasic[] pipelineSpecificationBasics,
+ String[] identifierClasses, String[] identifierHashes)
+ throws ManifoldCFException;
+
+ /** Look up ingestion data for a SET of documents.
+ *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+ *@param pipelineSpecificationBasic is the pipeline specification for all documents.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is the array of document identifier hashes to look up.
+ */
+ public void getPipelineDocumentIngestDataMultiple(
+ Map<OutputKey,DocumentIngestStatus> rval,
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String[] identifierClasses, String[] identifierHashes)
+ throws ManifoldCFException;
+
+ /** Look up ingestion data for a document.
+ *@param rval is a map of output key to document data, in no particular order, which will be loaded with all matching results.
+ *@param pipelineSpecificationBasic is the pipeline specification for the document.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hash of the id of the document.
+ */
+ public void getPipelineDocumentIngestData(
+ Map<OutputKey,DocumentIngestStatus> rval,
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String identifierClass, String identifierHash)
+ throws ManifoldCFException;
+
/** Calculate the average time interval between changes for a document.
* This is based on the data gathered for the document.
*@param outputConnectionName is the name of the output connection associated with this action.
@@ -464,6 +510,7 @@ public interface IIncrementalIngester
*@param identifierHashes is the hashes of the ids of the documents.
*@return the number of milliseconds between changes, or 0 if this cannot be calculated.
*/
+ @Deprecated
public long[] getDocumentUpdateIntervalMultiple(String outputConnectionName,
String[] identifierClasses, String[] identifierHashes)
throws ManifoldCFException;
@@ -475,10 +522,35 @@ public interface IIncrementalIngester
*@param identifierHash is the hash of the id of the document.
*@return the number of milliseconds between changes, or 0 if this cannot be calculated.
*/
+ @Deprecated
public long getDocumentUpdateInterval(String outputConnectionName,
String identifierClass, String identifierHash)
throws ManifoldCFException;
+ /** Calculate the average time interval between changes for a document.
+ * This is based on the data gathered for the document.
+ *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param identifierClasses are the names of the spaces in which the identifier hashes should be interpreted.
+ *@param identifierHashes is the hashes of the ids of the documents.
+ *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+ */
+ public long[] getDocumentUpdateIntervalMultiple(
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String[] identifierClasses, String[] identifierHashes)
+ throws ManifoldCFException;
+
+ /** Calculate the average time interval between changes for a document.
+ * This is based on the data gathered for the document.
+ *@param pipelineSpecificationBasic is the basic pipeline specification.
+ *@param identifierClass is the name of the space in which the identifier hash should be interpreted.
+ *@param identifierHash is the hash of the id of the document.
+ *@return the number of milliseconds between changes, or 0 if this cannot be calculated.
+ */
+ public long getDocumentUpdateInterval(
+ IPipelineSpecificationBasic pipelineSpecificationBasic,
+ String identifierClass, String identifierHash)
+ throws ManifoldCFException;
+
/** Reset all documents belonging to a specific output connection, because we've got information that
* that system has been reconfigured. This will force all such documents to be reindexed the next time
* they are checked.
Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecification.java Sat Jun 14 18:14:42 2014
@@ -23,10 +23,15 @@ import org.apache.manifoldcf.core.interf
/** This interface describes a multi-output pipeline, where each stage has an already-computed
* description string.
*/
-public interface IPipelineSpecification extends IPipelineSpecificationBasic
+public interface IPipelineSpecification
{
public static final String _rcsid = "@(#)$Id$";
+ /** Get the basic pipeline specification.
+ *@return the specification.
+ */
+ public IPipelineSpecificationBasic getBasicPipelineSpecification();
+
/** Get the description string for a pipeline stage.
*@param stage is the stage to get the connection name for.
*@return the description string that stage.
Modified: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/IPipelineSpecificationWithVersions.java Sat Jun 14 18:14:42 2014
@@ -23,10 +23,15 @@ import org.apache.manifoldcf.core.interf
/** This interface describes a multi-output pipeline, with existing document version information from
* each output..
*/
-public interface IPipelineSpecificationWithVersions extends IPipelineSpecification
+public interface IPipelineSpecificationWithVersions
{
public static final String _rcsid = "@(#)$Id$";
+ /** Get pipeline specification.
+ *@return the pipeline specification.
+ */
+ public IPipelineSpecification getPipelineSpecification();
+
/** For a given output index, return a document version string.
*@param index is the output index.
*@return the document version string.
Added: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java?rev=1602613&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java (added)
+++ manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java Sat Jun 14 18:14:42 2014
@@ -0,0 +1,79 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.agents.interfaces;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import java.util.*;
+
+/** This object functions as a key describing a unique output, consisting of:
+* - a document class
+* - a document ID hash
+* - an output connection name
+*/
+public class OutputKey
+{
+ public static final String _rcsid = "@(#)$Id$";
+
+ protected final String documentClass;
+ protected final String documentIDHash;
+ protected final String outputConnectionName;
+
+ /** Constructor */
+ public OutputKey(String documentClass, String documentIDHash, String outputConnectionName)
+ {
+ // Identifying information
+ this.documentClass = documentClass;
+ this.documentIDHash = documentIDHash;
+ this.outputConnectionName = outputConnectionName;
+ }
+
+ /** Get the document class */
+ public String getDocumentClass()
+ {
+ return documentClass;
+ }
+
+ /** Get the document ID hash */
+ public String getDocumentIDHash()
+ {
+ return documentIDHash;
+ }
+
+ /** Get the output connection name */
+ public String getOutputConnectionName()
+ {
+ return outputConnectionName;
+ }
+
+ public int hashCode()
+ {
+ return documentClass.hashCode() + documentIDHash.hashCode() + outputConnectionName.hashCode();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof OutputKey))
+ return false;
+ OutputKey dis = (OutputKey)o;
+ return dis.documentClass.equals(documentClass) &&
+ dis.documentIDHash.equals(documentIDHash) &&
+ dis.outputConnectionName.equals(outputConnectionName);
+ }
+
+}
Propchange: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: manifoldcf/branches/CONNECTORS-962/framework/agents/src/main/java/org/apache/manifoldcf/agents/interfaces/OutputKey.java
------------------------------------------------------------------------------
svn:keywords = Id
Modified: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentDescription.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentDescription.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentDescription.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/DocumentDescription.java Sat Jun 14 18:14:42 2014
@@ -27,12 +27,12 @@ public class DocumentDescription
public static final String _rcsid = "@(#)$Id: DocumentDescription.java 988245 2010-08-23 18:39:35Z kwright $";
// Member variables
- protected Long id;
- protected Long jobID;
- protected String documentIdentifierHash;
- protected String documentIdentifier;
- protected long failTime;
- protected int failRetryCount;
+ protected final Long id;
+ protected final Long jobID;
+ protected final String documentIdentifierHash;
+ protected final String documentIdentifier;
+ protected final long failTime;
+ protected final int failRetryCount;
/** Constructor.
*@param id is the record id.
Modified: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Sat Jun 14 18:14:42 2014
@@ -110,7 +110,7 @@ public class DocumentCleanupThread exten
IJobDescription job = dds.getJobDescription();
String connectionName = job.getConnectionName();
- String outputConnectionName = job.getOutputConnectionName();
+ IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
try
{
@@ -120,13 +120,12 @@ public class DocumentCleanupThread exten
IRepositoryConnection connection = connMgr.load(connectionName);
// This is where we store the hopcount cleanup data
- ArrayList arrayDocHashes = new ArrayList();
- ArrayList arrayDocsToDelete = new ArrayList();
- ArrayList arrayRelationshipTypes = new ArrayList();
- ArrayList hopcountMethods = new ArrayList();
+ List<String> arrayDocHashes = new ArrayList<String>();
+ List<CleanupQueuedDocument> arrayDocsToDelete = new ArrayList<CleanupQueuedDocument>();
+ List<String[]> arrayRelationshipTypes = new ArrayList<String[]>();
+ List<Integer> hopcountMethods = new ArrayList<Integer>();
- int j = 0;
- while (j < dds.getCount())
+ for (int j = 0; j < dds.getCount(); j++)
{
CleanupQueuedDocument dqd = dds.getDocument(j);
DocumentDescription ddd = dqd.getDocumentDescription();
@@ -142,7 +141,6 @@ public class DocumentCleanupThread exten
hopcountMethods.add(new Integer(job.getHopcountMode()));
}
}
- j++;
}
// Grab one connection for each connectionName. If we fail, nothing is lost and retries are possible.
@@ -154,18 +152,16 @@ public class DocumentCleanupThread exten
boolean[] deleteFromQueue = new boolean[arrayDocHashes.size()];
// Count the number of docs to actually delete. This will be a subset of the documents in the list.
- int k = 0;
int removeCount = 0;
- while (k < arrayDocHashes.size())
+ for (int k = 0; k < arrayDocHashes.size(); k++)
{
- if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+ if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
{
deleteFromQueue[k] = false;
removeCount++;
}
else
deleteFromQueue[k] = true;
- k++;
}
// Allocate removal arrays
@@ -173,33 +169,29 @@ public class DocumentCleanupThread exten
String[] hashedDocsToRemove = new String[removeCount];
// Now, iterate over the list
- k = 0;
removeCount = 0;
- while (k < arrayDocHashes.size())
+ for (int k = 0; k < arrayDocHashes.size(); k++)
{
- if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+ if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
{
docClassesToRemove[removeCount] = connectionName;
- hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(k);
+ hashedDocsToRemove[removeCount] = arrayDocHashes.get(k);
removeCount++;
}
- k++;
}
- OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+ OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr);
// Finally, go ahead and delete the documents from the ingestion system.
try
{
- ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
+ ingester.documentDeleteMultiple(pipelineSpecificationBasic,docClassesToRemove,hashedDocsToRemove,activities);
// Success! Label all these as needing deletion from queue.
- k = 0;
- while (k < arrayDocHashes.size())
+ for (int k = 0; k < arrayDocHashes.size(); k++)
{
- if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+ if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
deleteFromQueue[k] = true;
- k++;
}
}
catch (ServiceInterruption e)
@@ -208,10 +200,9 @@ public class DocumentCleanupThread exten
// Go through the list of documents we just tried, and reset them on the queue based on the
// ServiceInterruption parameters. Then we must proceed to delete ONLY the documents that
// were not part of the index deletion attempt.
- k = 0;
- while (k < arrayDocHashes.size())
+ for (int k = 0; k < arrayDocHashes.size(); k++)
{
- CleanupQueuedDocument cqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+ CleanupQueuedDocument cqd = arrayDocsToDelete.get(k);
if (cqd.shouldBeRemovedFromIndex())
{
DocumentDescription dd = cqd.getDocumentDescription();
@@ -219,23 +210,21 @@ public class DocumentCleanupThread exten
jobManager.resetCleaningDocument(dd,e.getRetryTime());
cqd.setProcessed();
}
- k++;
}
}
// Successfully deleted some documents from ingestion system. Now, remove them from job queue. This
// must currently happen one document at a time, because the jobs and connectors for each document
// potentially differ.
- k = 0;
- while (k < arrayDocHashes.size())
+ for (int k = 0; k < arrayDocHashes.size(); k++)
{
if (deleteFromQueue[k])
{
- DeleteQueuedDocument dqd = (DeleteQueuedDocument)arrayDocsToDelete.get(k);
+ CleanupQueuedDocument dqd = arrayDocsToDelete.get(k);
DocumentDescription ddd = dqd.getDocumentDescription();
Long jobID = ddd.getJobID();
- int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
- String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
+ int hopcountMethod = hopcountMethods.get(k).intValue();
+ String[] legalLinkTypes = arrayRelationshipTypes.get(k);
DocumentDescription[] requeueCandidates = jobManager.markDocumentCleanedUp(jobID,legalLinkTypes,ddd,hopcountMethod);
// Use the common method for doing the requeuing
ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
@@ -243,7 +232,6 @@ public class DocumentCleanupThread exten
// Finally, completed expiration of the document.
dqd.setProcessed();
}
- k++;
}
}
finally
@@ -349,18 +337,15 @@ public class DocumentCleanupThread exten
{
// Connection name
- protected String connectionName;
+ protected final String connectionName;
// Connection manager
- protected IRepositoryConnectionManager connMgr;
- // Output connection name
- protected String outputConnectionName;
+ protected final IRepositoryConnectionManager connMgr;
/** Constructor */
- public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr, String outputConnectionName)
+ public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr)
{
this.connectionName = connectionName;
this.connMgr = connMgr;
- this.outputConnectionName = outputConnectionName;
}
/** Record time-stamped information about the activity of the output connector.
@@ -382,7 +367,7 @@ public class DocumentCleanupThread exten
String entityURI, String resultCode, String resultDescription)
throws ManifoldCFException
{
- connMgr.recordHistory(connectionName,startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),dataSize,entityURI,resultCode,
+ connMgr.recordHistory(connectionName,startTime,activityType,dataSize,entityURI,resultCode,
resultDescription,null);
}
}
Modified: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentDeleteThread.java Sat Jun 14 18:14:42 2014
@@ -100,8 +100,8 @@ public class DocumentDeleteThread extend
IJobDescription job = dds.getJobDescription();
String connectionName = job.getConnectionName();
- String outputConnectionName = job.getOutputConnectionName();
-
+ IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
+
try
{
// Do the delete work.
@@ -115,8 +115,7 @@ public class DocumentDeleteThread extend
String[] docClassesToRemove = new String[dds.getCount()];
String[] hashedDocsToRemove = new String[dds.getCount()];
DeleteQueuedDocument[] docsToDelete = new DeleteQueuedDocument[dds.getCount()];
- int j = 0;
- while (j < dds.getCount())
+ for (int j = 0; j < dds.getCount(); j++)
{
DeleteQueuedDocument dqd = dds.getDocument(j);
DocumentDescription ddd = dqd.getDocumentDescription();
@@ -124,19 +123,16 @@ public class DocumentDeleteThread extend
hashedDocsToRemove[j] = ddd.getDocumentIdentifierHash();
docsToDelete[j] = dqd;
deleteFromQueue[j] = false;
- j++;
}
- OutputRemoveActivity logger = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+ OutputRemoveActivity logger = new OutputRemoveActivity(connectionName,connMgr);
try
{
- ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,logger);
- j = 0;
- while (j < dds.getCount())
+ ingester.documentDeleteMultiple(pipelineSpecificationBasic,docClassesToRemove,hashedDocsToRemove,logger);
+ for (int j = 0; j < dds.getCount(); j++)
{
deleteFromQueue[j] = true;
- j++;
}
}
catch (ServiceInterruption e)
@@ -145,46 +141,38 @@ public class DocumentDeleteThread extend
// Go through the list of documents we just tried, and reset them on the queue based on the
// ServiceInterruption parameters. Then we must proceed to delete ONLY the documents that
// were not part of the index deletion attempt.
- j = 0;
- while (j < dds.getCount())
+ for (int j = 0; j < dds.getCount(); j++)
{
DeleteQueuedDocument cqd = docsToDelete[j];
DocumentDescription dd = cqd.getDocumentDescription();
// To recover from an expiration failure, requeue the document to COMPLETED etc.
jobManager.resetDeletingDocument(dd,e.getRetryTime());
cqd.setProcessed();
- j++;
}
}
// Count the records we're actually going to delete
int recordCount = 0;
- j = 0;
- while (j < dds.getCount())
+ for (int j = 0; j < dds.getCount(); j++)
{
if (deleteFromQueue[j])
recordCount++;
- j++;
}
// Delete the records
DocumentDescription[] deleteDescriptions = new DocumentDescription[recordCount];
- j = 0;
recordCount = 0;
- while (j < dds.getCount())
+ for (int j = 0; j < dds.getCount(); j++)
{
if (deleteFromQueue[j])
deleteDescriptions[recordCount++] = docsToDelete[j].getDocumentDescription();
- j++;
}
jobManager.deleteIngestedDocumentIdentifiers(deleteDescriptions);
// Mark them as gone
- j = 0;
- while (j < dds.getCount())
+ for (int j = 0; j < dds.getCount(); j++)
{
if (deleteFromQueue[j])
docsToDelete[j].wasProcessed();
- j++;
}
// Go around again
}
@@ -193,10 +181,9 @@ public class DocumentDeleteThread extend
// Here we should take steps to insure that the documents that have been handed to us
// are dealt with appropriately. This may involve setting the document state to "complete"
// so that they will be picked up again.
- int j = 0;
- while (j < dds.getCount())
+ for (int j = 0; j < dds.getCount(); j++)
{
- DeleteQueuedDocument dqd = dds.getDocument(j++);
+ DeleteQueuedDocument dqd = dds.getDocument(j);
if (dqd.wasProcessed() == false)
{
@@ -272,19 +259,16 @@ public class DocumentDeleteThread extend
/** The OutputRemoveActivity class */
protected static class OutputRemoveActivity implements IOutputRemoveActivity
{
- // Connection name
- protected String connectionName;
// Connection manager
- protected IRepositoryConnectionManager connMgr;
+ protected final IRepositoryConnectionManager connMgr;
// Output connection name
- protected String outputConnectionName;
+ protected final String connectionName;
/** Constructor */
- public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr, String outputConnectionName)
+ public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr)
{
this.connectionName = connectionName;
this.connMgr = connMgr;
- this.outputConnectionName = outputConnectionName;
}
/** Record time-stamped information about the activity of the output connector.
@@ -306,7 +290,7 @@ public class DocumentDeleteThread extend
String entityURI, String resultCode, String resultDescription)
throws ManifoldCFException
{
- connMgr.recordHistory(connectionName,startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),dataSize,entityURI,resultCode,
+ connMgr.recordHistory(connectionName,startTime,activityType,dataSize,entityURI,resultCode,
resultDescription,null);
}
}
Modified: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1602613&r1=1602612&r2=1602613&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Sat Jun 14 18:14:42 2014
@@ -100,7 +100,7 @@ public class ExpireThread extends Thread
IJobDescription job = dds.getJobDescription();
String connectionName = job.getConnectionName();
- String outputConnectionName = job.getOutputConnectionName();
+ IPipelineSpecificationBasic pipelineSpecificationBasic = new PipelineSpecificationBasic(job);
try
{
@@ -112,10 +112,10 @@ public class ExpireThread extends Thread
IRepositoryConnection connection = connMgr.load(connectionName);
// This is where we store the hopcount cleanup data
- ArrayList arrayDocHashes = new ArrayList();
- ArrayList arrayDocsToDelete = new ArrayList();
- ArrayList arrayRelationshipTypes = new ArrayList();
- ArrayList hopcountMethods = new ArrayList();
+ List<String> arrayDocHashes = new ArrayList<String>();
+ List<CleanupQueuedDocument> arrayDocsToDelete = new ArrayList<CleanupQueuedDocument>();
+ List<String[]> arrayRelationshipTypes = new ArrayList<String[]>();
+ List<Integer> hopcountMethods = new ArrayList<Integer>();
int j = 0;
while (j < dds.getCount())
@@ -146,18 +146,16 @@ public class ExpireThread extends Thread
boolean[] deleteFromQueue = new boolean[arrayDocHashes.size()];
// Count the number of docs to actually delete. This will be a subset of the documents in the list.
- int k = 0;
int removeCount = 0;
- while (k < arrayDocHashes.size())
+ for (int k = 0; k < arrayDocHashes.size(); k++)
{
- if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+ if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
{
deleteFromQueue[k] = false;
removeCount++;
}
else
deleteFromQueue[k] = true;
- k++;
}
// Allocate removal arrays
@@ -165,33 +163,29 @@ public class ExpireThread extends Thread
String[] hashedDocsToRemove = new String[removeCount];
// Now, iterate over the list
- k = 0;
removeCount = 0;
- while (k < arrayDocHashes.size())
+ for (int k = 0; k < arrayDocHashes.size(); k++)
{
- if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+ if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
{
docClassesToRemove[removeCount] = connectionName;
- hashedDocsToRemove[removeCount] = (String)arrayDocHashes.get(k);
+ hashedDocsToRemove[removeCount] = arrayDocHashes.get(k);
removeCount++;
}
- k++;
}
- OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr,outputConnectionName);
+ OutputRemoveActivity activities = new OutputRemoveActivity(connectionName,connMgr);
// Finally, go ahead and delete the documents from the ingestion system.
// If we fail, we need to put the documents back on the queue.
try
{
- ingester.documentDeleteMultiple(outputConnectionName,docClassesToRemove,hashedDocsToRemove,activities);
+ ingester.documentDeleteMultiple(pipelineSpecificationBasic,docClassesToRemove,hashedDocsToRemove,activities);
// Success! Label all these as needing deletion from queue.
- k = 0;
- while (k < arrayDocHashes.size())
+ for (int k = 0; k < arrayDocHashes.size(); k++)
{
- if (((CleanupQueuedDocument)arrayDocsToDelete.get(k)).shouldBeRemovedFromIndex())
+ if (arrayDocsToDelete.get(k).shouldBeRemovedFromIndex())
deleteFromQueue[k] = true;
- k++;
}
}
catch (ServiceInterruption e)
@@ -200,10 +194,9 @@ public class ExpireThread extends Thread
// Go through the list of documents we just tried, and reset them on the queue based on the
// ServiceInterruption parameters. Then we must proceed to delete ONLY the documents that
// were not part of the index deletion attempt.
- k = 0;
- while (k < arrayDocHashes.size())
+ for (int k = 0; k < arrayDocHashes.size(); k++)
{
- CleanupQueuedDocument cqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+ CleanupQueuedDocument cqd = arrayDocsToDelete.get(k);
if (cqd.shouldBeRemovedFromIndex())
{
DocumentDescription dd = cqd.getDocumentDescription();
@@ -222,23 +215,21 @@ public class ExpireThread extends Thread
cqd.setProcessed();
}
}
- k++;
}
}
// Successfully deleted some documents from ingestion system. Now, remove them from job queue. This
// must currently happen one document at a time, because the jobs and connectors for each document
// potentially differ.
- k = 0;
- while (k < arrayDocHashes.size())
+ for (int k = 0; k < arrayDocHashes.size(); k++)
{
if (deleteFromQueue[k])
{
- CleanupQueuedDocument dqd = (CleanupQueuedDocument)arrayDocsToDelete.get(k);
+ CleanupQueuedDocument dqd = arrayDocsToDelete.get(k);
DocumentDescription ddd = dqd.getDocumentDescription();
Long jobID = ddd.getJobID();
- int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
- String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
+ int hopcountMethod = hopcountMethods.get(k).intValue();
+ String[] legalLinkTypes = arrayRelationshipTypes.get(k);
DocumentDescription[] requeueCandidates = jobManager.markDocumentExpired(jobID,legalLinkTypes,ddd,hopcountMethod);
// Use the common method for doing the requeuing
ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
@@ -246,7 +237,6 @@ public class ExpireThread extends Thread
// Finally, completed expiration of the document.
dqd.setProcessed();
}
- k++;
}
}
finally
@@ -352,18 +342,15 @@ public class ExpireThread extends Thread
{
// Connection name
- protected String connectionName;
+ protected final String connectionName;
// Connection manager
- protected IRepositoryConnectionManager connMgr;
- // Output connection name
- protected String outputConnectionName;
+ protected final IRepositoryConnectionManager connMgr;
/** Constructor */
- public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr, String outputConnectionName)
+ public OutputRemoveActivity(String connectionName, IRepositoryConnectionManager connMgr)
{
this.connectionName = connectionName;
this.connMgr = connMgr;
- this.outputConnectionName = outputConnectionName;
}
/** Record time-stamped information about the activity of the output connector.
@@ -385,7 +372,7 @@ public class ExpireThread extends Thread
String entityURI, String resultCode, String resultDescription)
throws ManifoldCFException
{
- connMgr.recordHistory(connectionName,startTime,ManifoldCF.qualifyOutputActivityName(activityType,outputConnectionName),dataSize,entityURI,resultCode,
+ connMgr.recordHistory(connectionName,startTime,activityType,dataSize,entityURI,resultCode,
resultDescription,null);
}
}
Added: manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java?rev=1602613&view=auto
==============================================================================
--- manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java (added)
+++ manifoldcf/branches/CONNECTORS-962/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/PipelineSpecification.java Sat Jun 14 18:14:42 2014
@@ -0,0 +1,71 @@
+/* $Id$ */
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.manifoldcf.crawler.system;
+
+import org.apache.manifoldcf.core.interfaces.*;
+import org.apache.manifoldcf.agents.interfaces.*;
+import org.apache.manifoldcf.crawler.interfaces.*;
+
+/** Class which handles pipeline specifications that include current (new) description strings.
+*/
+public class PipelineSpecification implements IPipelineSpecification
+{
+ protected final IPipelineSpecificationBasic basicSpecification;
+ protected final String[] pipelineDescriptionStrings;
+
+ public PipelineSpecification(IPipelineSpecificationBasic basicSpecification, IJobDescription job, IIncrementalIngester ingester)
+ throws ManifoldCFException, ServiceInterruption
+ {
+ this.basicSpecification = basicSpecification;
+ this.pipelineDescriptionStrings = new String[basicSpecification.getStageCount()];
+ for (int i = 0; i < pipelineDescriptionStrings.length; i++)
+ {
+ // Note: this needs to change when output connections become part of the pipeline
+ String descriptionString;
+ if (basicSpecification.checkStageOutputConnection(i))
+ {
+ descriptionString = ingester.getOutputDescription(basicSpecification.getStageConnectionName(i),job.getOutputSpecification());
+ }
+ else
+ {
+ descriptionString = ingester.getTransformationDescription(basicSpecification.getStageConnectionName(i),job.getPipelineStageSpecification(i));
+ }
+ this.pipelineDescriptionStrings[i] = descriptionString;
+ }
+ }
+
+ /** Get the basic pipeline specification.
+ *@return the specification.
+ */
+ @Override
+ public IPipelineSpecificationBasic getBasicPipelineSpecification()
+ {
+ return basicSpecification;
+ }
+
+ /** Get the description string for a pipeline stage.
+ *@param stage is the stage to get the connection name for.
+ *@return the description string that stage.
+ */
+ @Override
+ public String getStageDescriptionString(int stage)
+ {
+ return pipelineDescriptionStrings[stage];
+ }
+}