You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/08/13 02:44:06 UTC
svn commit: r1372225 [1/2] - in /manifoldcf/trunk: ./
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/
framework/pull-agent/src/main/java/org/apache/mani...
Author: kwright
Date: Mon Aug 13 00:44:05 2012
New Revision: 1372225
URL: http://svn.apache.org/viewvc?rev=1372225&view=rev
Log:
Fix for CONNECTORS-501.
Modified:
manifoldcf/trunk/ (props changed)
manifoldcf/trunk/CHANGES.txt
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/WorkerThread.java
manifoldcf/trunk/tests/filesystem/src/test/java/org/apache/manifoldcf/filesystem_tests/HopcountTester.java
manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/BigCrawlTester.java
manifoldcf/trunk/tests/webcrawler/src/test/java/org/apache/manifoldcf/webcrawler_tests/MockWebService.java
Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
Merged /manifoldcf/branches/CONNECTORS-501:r1370450-1372223
Modified: manifoldcf/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/CHANGES.txt?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/CHANGES.txt (original)
+++ manifoldcf/trunk/CHANGES.txt Mon Aug 13 00:44:05 2012
@@ -3,6 +3,11 @@ $Id$
======================= 0.7-dev =====================
+CONNECTORS-501: Fix hopcount logic to return a deterministic
+number of documents. A number of race conditions were discovered
+and corrected.
+(Shigeki Kobayashi, Karl Wright)
+
CONNECTORS-497: Add Lists support to the SharePoint connector.
(Ahmet Arslan, Karl Wright)
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/interfaces/IJobManager.java Mon Aug 13 00:44:05 2012
@@ -274,9 +274,13 @@ public interface IJobManager
public void markDocumentCompleted(DocumentDescription documentDescription)
throws ManifoldCFException;
- /** Note deletion as result of document processing by a job thread of a document.
+ /** Delete from queue as a result of processing of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. The RESCAN variants are interpreted
+ * as meaning that the document should not be deleted, but should instead be popped back on the queue for
+ * a repeat processing attempt.
*@param documentDescriptions are the set of description objects for the documents that were processed.
- *@param hopcountMethod is one of complete, partial, or nevercomplete.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
*@return the set of documents for which carrydown data was changed by this operation. These documents are likely
* to be requeued as a result of the change.
*/
@@ -284,9 +288,13 @@ public interface IJobManager
int hopcountMethod)
throws ManifoldCFException;
- /** Note deletion as result of document processing by a job thread of a document.
+ /** Delete from queue as a result of processing of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. The RESCAN variants are interpreted
+ * as meaning that the document should not be deleted, but should instead be popped back on the queue for
+ * a repeat processing attempt.
*@param documentDescription is the description object for the document that was processed.
- *@param hopcountMethod is one of complete, partial, or nevercomplete.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
*@return the set of documents for which carrydown data was changed by this operation. These documents are likely
* to be requeued as a result of the change.
*/
@@ -294,6 +302,84 @@ public interface IJobManager
int hopcountMethod)
throws ManifoldCFException;
+ /** Mark hopcount removal from queue as a result of processing of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. The RESCAN variants are interpreted
+ * as meaning that the document should not be marked as removed, but should instead be popped back on the queue for
+ * a repeat processing attempt.
+ *@param documentDescriptions are the set of description objects for the documents that were processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentHopcountRemovalMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+ int hopcountMethod)
+ throws ManifoldCFException;
+
+ /** Mark hopcount removal from queue as a result of processing of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. The RESCAN variants are interpreted
+ * as meaning that the document should not be marked as removed, but should instead be popped back on the queue for
+ * a repeat processing attempt.
+ *@param documentDescription is the description object for the document that was processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentHopcountRemoval(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+ int hopcountMethod)
+ throws ManifoldCFException;
+
+ /** Delete from queue as a result of expiration of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. Since the document expired,
+ * no special activity takes place as a result of the document being in a RESCAN state.
+ *@param documentDescriptions are the set of description objects for the documents that were processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentExpiredMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+ int hopcountMethod)
+ throws ManifoldCFException;
+
+ /** Delete from queue as a result of expiration of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. Since the document expired,
+ * no special activity takes place as a result of the document being in a RESCAN state.
+ *@param documentDescription is the description object for the document that was processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentExpired(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+ int hopcountMethod)
+ throws ManifoldCFException;
+
+ /** Delete from queue as a result of cleaning up an unreachable document.
+ * The document is expected to be in the PURGATORY state. There is never any need to reprocess the
+ * document.
+ *@param documentDescriptions are the set of description objects for the documents that were processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentCleanedUpMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+ int hopcountMethod)
+ throws ManifoldCFException;
+
+ /** Delete from queue as a result of cleaning up an unreachable document.
+ * The document is expected to be in the PURGATORY state. There is never any need to reprocess the
+ * document.
+ *@param documentDescription is the description object for the document that was processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentCleanedUp(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+ int hopcountMethod)
+ throws ManifoldCFException;
+
/** Requeue a document set because of carrydown changes.
* This method is called when carrydown data is modified for a set of documents. The documents must be requeued for immediate reprocessing, even to the
* extent that if one is *already* being processed, it will need to be done over again.
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/HopCount.java Mon Aug 13 00:44:05 2012
@@ -364,20 +364,20 @@ public class HopCount extends org.apache
/** Record a reference from source to target. This reference will be marked as "new" or "existing".
*/
- public void recordReference(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String targetDocumentIDHash, String linkType,
+ public boolean recordReference(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String targetDocumentIDHash, String linkType,
int hopcountMethod)
throws ManifoldCFException
{
- doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,new String[]{targetDocumentIDHash},linkType,hopcountMethod);
+ return doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,new String[]{targetDocumentIDHash},linkType,hopcountMethod)[0];
}
/** Record a set of references from source to target. This reference will be marked as "new" or "existing".
*/
- public void recordReferences(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
+ public boolean[] recordReferences(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
int hopcountMethod)
throws ManifoldCFException
{
- doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,targetDocumentIDHashes,linkType,hopcountMethod);
+ return doRecord(jobID,legalLinkTypes,sourceDocumentIDHash,targetDocumentIDHashes,linkType,hopcountMethod);
}
/** Complete a recalculation pass for a set of source documents. All child links that are not marked as "new"
@@ -390,13 +390,19 @@ public class HopCount extends org.apache
}
/** Do the work of recording source-target references. */
- protected void doRecord(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
+ protected boolean[] doRecord(Long jobID, String[] legalLinkTypes, String sourceDocumentIDHash, String[] targetDocumentIDHashes, String linkType,
int hopcountMethod)
throws ManifoldCFException
{
// We have to both add the reference, AND invalidate appropriate cached hopcounts (if it is a NEW
// link.)
+ boolean[] rval = new boolean[targetDocumentIDHashes.length];
+ for (int i = 0; i < rval.length; i++)
+ {
+ rval[i] = false;
+ }
+
beginTransaction();
try
{
@@ -404,6 +410,19 @@ public class HopCount extends org.apache
if (newReferences.length > 0)
{
// There are added links.
+
+ // First, note them in return value
+ Set<String> newSet = new HashSet<String>();
+ for (int i = 0; i < newReferences.length; i++)
+ {
+ newSet.add(newReferences[i]);
+ }
+ for (int i = 0; i < rval.length; i++)
+ {
+ if (newSet.contains(targetDocumentIDHashes[i]) &&
+ (sourceDocumentIDHash==null || !sourceDocumentIDHash.equals(targetDocumentIDHashes[i])))
+ rval[i] = true;
+ }
// The add causes hopcount records to be queued for processing (and created if they don't exist).
// ALL the hopcount records for the target document ids must be queued, for all the link types
@@ -437,10 +456,9 @@ public class HopCount extends org.apache
if (sourceDocumentIDHash == null || sourceDocumentIDHash.length() == 0)
{
- int i = 0;
- while (i < estimates.length)
+ for (int i = 0; i < estimates.length; i++)
{
- estimates[i++] = new Answer(0);
+ estimates[i] = new Answer(0);
}
}
else
@@ -459,19 +477,16 @@ public class HopCount extends org.apache
new MultiClause(linkTypeField,legalLinkTypes)}));
IResultSet set = performQuery(sb.toString(),list,null,null);
- HashMap answerMap = new HashMap();
- int i = 0;
- while (i < estimates.length)
+ Map<String,Answer> answerMap = new HashMap<String,Answer>();
+ for (int i = 0; i < estimates.length; i++)
{
estimates[i] = new Answer(ANSWER_INFINITY);
answerMap.put(legalLinkTypes[i],estimates[i]);
- i++;
}
- i = 0;
- while (i < set.getRowCount())
+ for (int i = 0; i < set.getRowCount(); i++)
{
- IResultRow row = set.getRow(i++);
+ IResultRow row = set.getRow(i);
Long id = (Long)row.getValue(idField);
DeleteDependency[] dds;
if (hopcountMethod != IJobDescription.HOPCOUNT_NEVERDELETE)
@@ -480,7 +495,7 @@ public class HopCount extends org.apache
dds = new DeleteDependency[0];
Long distance = (Long)row.getValue(distanceField);
String recordedLinkType = (String)row.getValue(linkTypeField);
- Answer a = (Answer)answerMap.get(recordedLinkType);
+ Answer a = answerMap.get(recordedLinkType);
int recordedDistance = (int)distance.longValue();
if (recordedDistance != -1)
{
@@ -495,6 +510,7 @@ public class HopCount extends org.apache
if (Logging.hopcount.isDebugEnabled())
Logging.hopcount.debug("Done queueing "+Integer.toString(targetDocumentIDHashes.length)+" documents");
}
+ return rval;
}
catch (ManifoldCFException e)
{
@@ -571,13 +587,11 @@ public class HopCount extends org.apache
//
// ... and then, re-evaluate all hopcount records and their dependencies that are marked for delete.
//
- // But, the trick is that both source and target links must go away!! So deleting a document is very different than
- // updating a link...
+
// This also removes the links themselves...
if (hopcountMethod == IJobDescription.HOPCOUNT_ACCURATE)
doDeleteDocuments(jobID,documentHashes);
-
}
catch (ManifoldCFException e)
@@ -733,7 +747,7 @@ public class HopCount extends org.apache
/** Limited find for missing records.
*/
- protected void performFindMissingRecords(Long jobID, String[] affectedLinkTypes, ArrayList list, Map matchMap)
+ protected void performFindMissingRecords(Long jobID, String[] affectedLinkTypes, ArrayList list, Map<Question,Long> matchMap)
throws ManifoldCFException
{
ArrayList newList = new ArrayList();
@@ -796,19 +810,16 @@ public class HopCount extends org.apache
if (Logging.hopcount.isDebugEnabled())
{
Logging.hopcount.debug("Adding "+Integer.toString(documentIDHashes.length)+" documents to processing queue");
- int z = 0;
- while (z < documentIDHashes.length)
+ for (int z = 0; z < documentIDHashes.length; z++)
{
- Logging.hopcount.debug(" Adding '"+documentIDHashes[z++]+"' to processing queue");
+ Logging.hopcount.debug(" Adding '"+documentIDHashes[z]+"' to processing queue");
}
Logging.hopcount.debug("The source id is '"+sourceDocumentIDHash+"' and linktype is '"+linkType+"', and there are "+
Integer.toString(affectedLinkTypes.length)+" affected link types, as below:");
- z = 0;
- while (z < affectedLinkTypes.length)
+ for (int z = 0; z < affectedLinkTypes.length; z++)
{
Logging.hopcount.debug(" Linktype '"+affectedLinkTypes[z]+"', current distance "+Integer.toString(startingAnswers[z].getAnswer())+" with "+
Integer.toString(startingAnswers[z].countDeleteDependencies())+" delete dependencies.");
- z++;
}
}
@@ -817,15 +828,13 @@ public class HopCount extends org.apache
// so we can make sure they are added to the queue properly.
// Make a map of the combinations of link type and document id we want to have present
- HashMap matchMap = new HashMap();
+ Map<Question,Long> matchMap = new HashMap();
// Make a map from the link type to the corresponding Answer object
- HashMap answerMap = new HashMap();
- int u = 0;
- while (u < affectedLinkTypes.length)
+ Map<String,Answer> answerMap = new HashMap<String,Answer>();
+ for (int u = 0; u < affectedLinkTypes.length; u++)
{
answerMap.put(affectedLinkTypes[u],startingAnswers[u]);
- u++;
}
// Do this in a transaction
@@ -838,9 +847,8 @@ public class HopCount extends org.apache
int maxClause = maxClausePerformFindMissingRecords(jobID,affectedLinkTypes);
ArrayList list = new ArrayList();
- int i = 0;
int k = 0;
- while (i < documentIDHashes.length)
+ for (int i = 0; i < documentIDHashes.length; i++)
{
String documentIDHash = documentIDHashes[i];
@@ -853,7 +861,6 @@ public class HopCount extends org.apache
list.add(documentIDHash);
k++;
- i++;
}
if (k > 0)
performFindMissingRecords(jobID,affectedLinkTypes,list,matchMap);
@@ -864,12 +871,10 @@ public class HopCount extends org.apache
// for queuing.
HashMap map = new HashMap();
- i = 0;
- while (i < documentIDHashes.length)
+ for (int i = 0; i < documentIDHashes.length; i++)
{
String documentIDHash = documentIDHashes[i];
- int j = 0;
- while (j < affectedLinkTypes.length)
+ for (int j = 0; j < affectedLinkTypes.length; j++)
{
String affectedLinkType = affectedLinkTypes[j];
Question q = new Question(documentIDHash,affectedLinkType);
@@ -927,9 +932,7 @@ public class HopCount extends org.apache
matchMap.remove(q);
}
}
- j++;
}
- i++;
}
// For all the records still in the matchmap, queue them.
@@ -943,26 +946,24 @@ public class HopCount extends org.apache
StringBuilder sb = new StringBuilder();
list = new ArrayList();
k = 0;
- i = 0;
- while (k < documentIDHashes.length)
+ for (int i = 0; i < documentIDHashes.length; i++)
{
- String documentIDHash = documentIDHashes[k];
- int j = 0;
- while (j < affectedLinkTypes.length)
+ String documentIDHash = documentIDHashes[i];
+ for (int j = 0; j < affectedLinkTypes.length; j++)
{
String affectedLinkType = affectedLinkTypes[j];
Question q = new Question(documentIDHash,affectedLinkType);
if (matchMap.get(q) != null)
{
- if (i == maxClause)
+ if (k == maxClause)
{
performMarkAddDeps(sb.toString(),list);
- i = 0;
+ k = 0;
sb.setLength(0);
list.clear();
}
- if (i > 0)
+ if (k > 0)
sb.append(" OR ");
// We only want to queue up hopcount records that correspond to the affected link types.
@@ -975,17 +976,17 @@ public class HopCount extends org.apache
sb.append(buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID),
- new UnitaryClause(markForDeathField,markToString(MARK_QUEUED)),
+ new MultiClause(markForDeathField,new Object[]{
+ markToString(MARK_NORMAL),
+ markToString(MARK_DELETING)}),
new UnitaryClause(parentIDHashField,documentIDHash),
new UnitaryClause(linkTypeField,affectedLinkType)}));
- i++;
+ k++;
}
- j++;
}
- k++;
}
- if (i > 0)
+ if (k > 0)
performMarkAddDeps(sb.toString(),list);
// Leave the dependency records for the queued rows. This will save lots of work if we decide not to
@@ -1064,7 +1065,7 @@ public class HopCount extends org.apache
}
- /** Invalidate links that start with or end in a specific set of documents, described by
+ /** Invalidate links that start with a specific set of documents, described by
* a table join.
*/
protected void doDeleteDocuments(Long jobID,
@@ -1121,39 +1122,7 @@ public class HopCount extends org.apache
performUpdate(map,sb.toString(),list,null);
noteModifications(0,1,0);
-
- sb = new StringBuilder("WHERE ");
- list = new ArrayList();
-
- sb.append(idField).append(" IN(SELECT t0.").append(deleteDepsManager.ownerIDField).append(" FROM ")
- .append(deleteDepsManager.getTableName()).append(" t0,").append(joinTableName).append(",")
- .append(intrinsicLinkManager.getTableName()).append(" t1 WHERE ");
-
- sb.append(buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause("t0."+deleteDepsManager.jobIDField,jobID)})).append(" AND ");
-
- sb.append(buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause("t1."+intrinsicLinkManager.jobIDField,jobID),
- new JoinClause("t1."+intrinsicLinkManager.parentIDHashField,"t0."+deleteDepsManager.parentIDHashField),
- new JoinClause("t1."+intrinsicLinkManager.linkTypeField,"t0."+deleteDepsManager.linkTypeField),
- new JoinClause("t1."+intrinsicLinkManager.childIDHashField,"t0."+deleteDepsManager.childIDHashField)})).append(" AND ");
-
- sb.append(buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(joinTableJobColumn,jobID),
- new JoinClause(joinTableIDColumn,"t0."+deleteDepsManager.parentIDHashField)})).append(" AND ");
-
- sb.append(joinTableCriteria);
- list.addAll(joinTableParams);
-
- sb.append(")");
-
- map = new HashMap();
- // These are whacked back to "infinity" to avoid infinite looping in a cut-off graph.
- map.put(distanceField,new Long(-1L));
- map.put(markForDeathField,markToString(MARK_DELETING));
- performUpdate(map,sb.toString(),list,null);
- noteModifications(0,1,0);
-
+ // We do NOT do the parentID because otherwise we have the potential to delete links that we need later. See CONNECTORS-501.
if (Logging.hopcount.isDebugEnabled())
Logging.hopcount.debug("Done setting hopcount rows for job "+jobID+" to initial distances");
@@ -1186,7 +1155,7 @@ public class HopCount extends org.apache
}
- /** Invalidate links that start with or end in a specific set of documents.
+ /** Invalidate links that start with a specific set of documents.
*/
protected void doDeleteDocuments(Long jobID,
String[] documentHashes)
@@ -1307,28 +1276,8 @@ public class HopCount extends org.apache
map.put(markForDeathField,markToString(MARK_DELETING));
performUpdate(map,sb.toString(),thisList,null);
- sb = new StringBuilder("WHERE ");
- thisList = new ArrayList();
-
- sb.append(idField).append(" IN(SELECT ").append(deleteDepsManager.ownerIDField).append(" FROM ").append(deleteDepsManager.getTableName()).append(" t0 WHERE ")
- .append(buildConjunctionClause(thisList,new ClauseDescription[]{
- new UnitaryClause("t0."+deleteDepsManager.jobIDField,jobID),
- new MultiClause("t0."+deleteDepsManager.parentIDHashField,list)})).append(" AND ");
-
- sb.append("EXISTS(SELECT 'x' FROM ").append(intrinsicLinkManager.getTableName()).append(" t1 WHERE ")
- .append(buildConjunctionClause(thisList,new ClauseDescription[]{
- new JoinClause("t1."+intrinsicLinkManager.jobIDField,"t0."+deleteDepsManager.jobIDField),
- new JoinClause("t1."+intrinsicLinkManager.linkTypeField,"t0."+deleteDepsManager.linkTypeField),
- new JoinClause("t1."+intrinsicLinkManager.parentIDHashField,"t0."+deleteDepsManager.parentIDHashField),
- new JoinClause("t1."+intrinsicLinkManager.childIDHashField,"t0."+deleteDepsManager.childIDHashField)}));
-
- sb.append("))");
-
- map = new HashMap();
- // These are whacked back to "infinity" to avoid infinite looping in a cut-off graph.
- map.put(distanceField,new Long(-1L));
- map.put(markForDeathField,markToString(MARK_DELETING));
- performUpdate(map,sb.toString(),thisList,null);
+ // We do NOT do the parentID because we need to leave intrinsic links around that could be used again.
+ // See CONNECTORS-501.
}
/** Invalidate links meeting a simple criteria which have a given set of source documents. This also runs a queue
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/IntrinsicLink.java Mon Aug 13 00:44:05 2012
@@ -354,24 +354,9 @@ public class IntrinsicLink extends org.a
performDelete(sb.toString(),list,null);
noteModifications(0,0,1);
-
- // Delete matches for parentIDHashField
- sb = new StringBuilder("WHERE ");
- list = new ArrayList();
-
- sb.append("EXISTS(SELECT 'x' FROM ").append(joinTableName).append(" WHERE ")
- .append(buildConjunctionClause(list,new ClauseDescription[]{
- new UnitaryClause(joinTableJobColumn,jobID),
- new JoinClause(joinTableIDColumn,getTableName()+"."+parentIDHashField)})).append(" AND ");
-
- sb.append(joinTableCriteria);
- list.addAll(joinTableParams);
-
- sb.append(")");
-
- performDelete(sb.toString(),list,null);
- noteModifications(0,0,1);
-
+
+ // DON'T delete ParentID matches; we need to leave those around for bookkeeping to
+ // be correct. See CONNECTORS-501.
}
catch (ManifoldCFException e)
{
@@ -451,14 +436,8 @@ public class IntrinsicLink extends org.a
new UnitaryClause(jobIDField,jobID),
new MultiClause(childIDHashField,list)}));
performDelete(sb.toString(),thisList,null);
-
- sb = new StringBuilder("WHERE ");
- thisList = new ArrayList();
-
- sb.append(buildConjunctionClause(thisList,new ClauseDescription[]{
- new UnitaryClause(jobIDField,jobID),
- new MultiClause(parentIDHashField,list)}));
- performDelete(sb.toString(),thisList,null);
+
+ // DON'T do parentID matches; we need to leave those around. See CONNECTORS-501.
}
/** Remove all target links of the specified source documents that are not marked as "new" or "existing", and
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Mon Aug 13 00:44:05 2012
@@ -1469,6 +1469,7 @@ public class JobManager implements IJobM
.append(" FROM ").append(jobQueue.getTableName()).append(" t0 WHERE ")
.append(database.buildConjunctionClause(list,new ClauseDescription[]{
new MultiClause(jobQueue.statusField,new Object[]{
+ JobQueue.statusToString(jobQueue.STATUS_HOPCOUNTREMOVED),
JobQueue.statusToString(jobQueue.STATUS_PENDING),
JobQueue.statusToString(jobQueue.STATUS_PENDINGPURGATORY)}),
new UnitaryClause(jobQueue.prioritySetField,"<",new Long(currentTime))})).append(" AND ")
@@ -2474,7 +2475,11 @@ public class JobManager implements IJobM
markDocumentCompletedMultiple(new DocumentDescription[]{documentDescription});
}
- /** Note deletion as result of document processing by a job thread of a document.
+ /** Delete from queue as a result of processing of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. The RESCAN variants are interpreted
+ * as meaning that the document should not be deleted, but should instead be popped back on the queue for
+ * a repeat processing attempt.
*@param documentDescriptions are the set of description objects for the documents that were processed.
*@param hopcountMethod describes how to handle deletions for hopcount purposes.
*@return the set of documents for which carrydown data was changed by this operation. These documents are likely
@@ -2484,6 +2489,237 @@ public class JobManager implements IJobM
int hopcountMethod)
throws ManifoldCFException
{
+ // It's no longer an issue to have to deal with documents being conditionally deleted; that's been
+ // taken over by the hopcountremoval method below. So just use the simple 'delete' functionality.
+ return doDeleteMultiple(jobID,legalLinkTypes,documentDescriptions,hopcountMethod);
+ }
+
+ /** Delete from queue as a result of processing of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. The RESCAN variants are interpreted
+ * as meaning that the document should not be deleted, but should instead be popped back on the queue for
+ * a repeat processing attempt.
+ *@param documentDescription is the description object for the document that was processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentDeleted(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+ int hopcountMethod)
+ throws ManifoldCFException
+ {
+ return markDocumentDeletedMultiple(jobID,legalLinkTypes,new DocumentDescription[]{documentDescription},hopcountMethod);
+ }
+
+ /** Mark hopcount removal from queue as a result of processing of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. The RESCAN variants are interpreted
+ * as meaning that the document should not be marked as removed, but should instead be popped back on the queue for
+ * a repeat processing attempt.
+ *@param documentDescriptions are the set of description objects for the documents that were processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentHopcountRemovalMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+ int hopcountMethod)
+ throws ManifoldCFException
+ {
+ // For each record, we're going to have to choose between marking it as "hopcount removed", and marking
+ // it for rescan. So the basic flow will involve changing a document's status,.
+
+ // Before we can change a document status, we need to know the *current* status. Therefore, a SELECT xxx FOR UPDATE/UPDATE
+ // transaction is needed in order to complete these documents correctly.
+ //
+ // Since we are therefore setting row locks on thejobqueue table, we need to work to avoid unnecessary deadlocking. To do that, we have to
+ // lock rows in document id hash order!! Luckily, the DocumentDescription objects have a document identifier buried within, which we can use to
+ // order the "select for update" operations appropriately.
+ //
+
+ HashMap indexMap = new HashMap();
+ String[] docIDHashes = new String[documentDescriptions.length];
+
+ int i = 0;
+ while (i < documentDescriptions.length)
+ {
+ String documentIDHash = documentDescriptions[i].getDocumentIdentifierHash() + ":" + documentDescriptions[i].getJobID();
+ docIDHashes[i] = documentIDHash;
+ indexMap.put(documentIDHash,new Integer(i));
+ i++;
+ }
+
+ java.util.Arrays.sort(docIDHashes);
+
+ // Retry loop - in case we get a deadlock despite our best efforts
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction(database.TRANSACTION_SERIALIZED);
+ try
+ {
+ // Do one row at a time, to avoid deadlocking things
+ List<String> deleteList = new ArrayList<String>();
+
+ i = 0;
+ while (i < docIDHashes.length)
+ {
+ String docIDHash = docIDHashes[i];
+
+ // Get the DocumentDescription object
+ DocumentDescription dd = documentDescriptions[((Integer)indexMap.get(docIDHash)).intValue()];
+
+ // Query for the status
+ ArrayList list = new ArrayList();
+ String query = database.buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(jobQueue.idField,dd.getID())});
+ IResultSet set = database.performQuery("SELECT "+jobQueue.statusField+" FROM "+jobQueue.getTableName()+" WHERE "+
+ query+" FOR UPDATE",list,null,null);
+ if (set.getRowCount() > 0)
+ {
+ IResultRow row = set.getRow(0);
+ // Grab the status
+ int status = jobQueue.stringToStatus((String)row.getValue(jobQueue.statusField));
+ // Update the jobqueue table
+ boolean didDelete = jobQueue.updateOrHopcountRemoveRecord(dd.getID(),status);
+ if (didDelete)
+ {
+ deleteList.add(dd.getDocumentIdentifierHash());
+ }
+ }
+ i++;
+ }
+
+ String[] docIDSimpleHashes = new String[deleteList.size()];
+ for (int j = 0; j < docIDSimpleHashes.length; j++)
+ {
+ docIDSimpleHashes[j] = deleteList.get(j);
+ }
+
+ // Next, find the documents that are affected by carrydown deletion.
+ DocumentDescription[] rval = calculateAffectedDeleteCarrydownChildren(jobID,docIDSimpleHashes);
+
+ // Since hopcount inheritance and prerequisites came from the addDocument() method,
+ // we don't delete them here.
+
+ database.performCommit();
+ return rval;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted transaction marking completed "+Integer.toString(docIDHashes.length)+
+ " docs: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+ }
+
+ /** Mark hopcount removal from queue as a result of processing of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. The RESCAN variants are interpreted
+ * as meaning that the document should not be marked as removed, but should instead be popped back on the queue for
+ * a repeat processing attempt.
+ *@param documentDescription is the description object for the document that was processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentHopcountRemoval(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+ int hopcountMethod)
+ throws ManifoldCFException
+ {
+ return markDocumentHopcountRemovalMultiple(jobID,legalLinkTypes,new DocumentDescription[]{documentDescription},hopcountMethod);
+ }
+
+ /** Delete from queue as a result of expiration of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. Since the document expired,
+ * no special activity takes place as a result of the document being in a RESCAN state.
+ *@param documentDescriptions are the set of description objects for the documents that were processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentExpiredMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+ int hopcountMethod)
+ throws ManifoldCFException
+ {
+ return doDeleteMultiple(jobID,legalLinkTypes,documentDescriptions,hopcountMethod);
+ }
+
+ /** Delete from queue as a result of expiration of an active document.
+ * The document is expected to be in one of the active states: ACTIVE, ACTIVESEEDING,
+ * ACTIVENEEDSRESCAN, ACTIVESEEDINGNEEDSRESCAN. Since the document expired,
+ * no special activity takes place as a result of the document being in a RESCAN state.
+ *@param documentDescription is the description object for the document that was processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentExpired(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+ int hopcountMethod)
+ throws ManifoldCFException
+ {
+ return markDocumentExpiredMultiple(jobID,legalLinkTypes,new DocumentDescription[]{documentDescription},hopcountMethod);
+ }
+
+ /** Delete from queue as a result of cleaning up an unreachable document.
+ * The document is expected to be in the PURGATORY state. There is never any need to reprocess the
+ * document.
+ *@param documentDescriptions are the set of description objects for the documents that were processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentCleanedUpMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+ int hopcountMethod)
+ throws ManifoldCFException
+ {
+ return doDeleteMultiple(jobID,legalLinkTypes,documentDescriptions,hopcountMethod);
+ }
+
+ /** Delete from queue as a result of cleaning up an unreachable document.
+ * The document is expected to be in the PURGATORY state. There is never any need to reprocess the
+ * document.
+ *@param documentDescription is the description object for the document that was processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ public DocumentDescription[] markDocumentCleanedUp(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
+ int hopcountMethod)
+ throws ManifoldCFException
+ {
+ return markDocumentCleanedUpMultiple(jobID,legalLinkTypes,new DocumentDescription[]{documentDescription},hopcountMethod);
+ }
+
+ /** Delete documents with no repercussions. We don't have to worry about the current state of each document,
+ * since the document is definitely going away.
+ *@param documentDescriptions are the set of description objects for the documents that were processed.
+ *@param hopcountMethod describes how to handle deletions for hopcount purposes.
+ *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
+ * to be requeued as a result of the change.
+ */
+ protected DocumentDescription[] doDeleteMultiple(Long jobID, String[] legalLinkTypes, DocumentDescription[] documentDescriptions,
+ int hopcountMethod)
+ throws ManifoldCFException
+ {
if (documentDescriptions.length == 0)
return new DocumentDescription[0];
@@ -2684,18 +2920,6 @@ public class JobManager implements IJobM
}
}
- /** Note deletion as result of document processing by a job thread of a document.
- *@param documentDescription is the description object for the document that was processed.
- *@param hopcountMethod describes how to handle deletions for hopcount purposes.
- *@return the set of documents for which carrydown data was changed by this operation. These documents are likely
- * to be requeued as a result of the change.
- */
- public DocumentDescription[] markDocumentDeleted(Long jobID, String[] legalLinkTypes, DocumentDescription documentDescription,
- int hopcountMethod)
- throws ManifoldCFException
- {
- return markDocumentDeletedMultiple(jobID,legalLinkTypes,new DocumentDescription[]{documentDescription},hopcountMethod);
- }
/** Requeue a document for further processing in the future.
@@ -3845,8 +4069,7 @@ public class JobManager implements IJobM
// Go through document id's one at a time, in order - mainly to prevent deadlock as much as possible. Search for any existing row in jobqueue first (for update)
HashMap existingRows = new HashMap();
- int z = 0;
- while (z < reorderedDocIDHashes.length)
+ for (int z = 0; z < reorderedDocIDHashes.length; z++)
{
String docIDHash = reorderedDocIDHashes[z];
@@ -3885,17 +4108,22 @@ public class JobManager implements IJobM
jobQueue.insertNewRecord(jobID,docIDHash,reorderedDocumentIdentifiers[z],reorderedDocumentPriorities[z],0L,currentTime,reorderedDocumentPrerequisites[z]);
}
- z++;
}
// Update all the carrydown data at once, for greatest efficiency.
boolean[] carrydownChangesSeen = carryDown.recordCarrydownDataMultiple(jobID,parentIdentifierHash,reorderedDocIDHashes,dataNames,dataHashValues,dataValues);
+ // Same with hopcount.
+ boolean[] hopcountChangesSeen = null;
+ if (parentIdentifierHash != null && relationshipType != null)
+ hopcountChangesSeen = hopCount.recordReferences(jobID,legalLinkTypes,parentIdentifierHash,reorderedDocIDHashes,relationshipType,hopcountMethod);
+
// Loop through the document id's again, and perform updates where needed
boolean[] reorderedRval = new boolean[reorderedDocIDHashes.length];
- z = 0;
- while (z < reorderedDocIDHashes.length)
+ boolean reactivateRemovedHopcountRecords = false;
+
+ for (int z = 0; z < reorderedDocIDHashes.length; z++)
{
String docIDHash = reorderedDocIDHashes[z];
JobqueueRecord jr = (JobqueueRecord)existingRows.get(docIDHash);
@@ -3903,14 +4131,22 @@ public class JobManager implements IJobM
// It was an insert
reorderedRval[z] = true;
else
+ {
// It was an existing row; do the update logic
+ // The hopcountChangesSeen array describes whether each reference is a new one. This
+ // helps us determine whether we're going to need to "flip" HOPCOUNTREMOVED documents
+ // to the PENDING state. If the new link ended in an existing record, THEN we need to flip them all!
reorderedRval[z] = jobQueue.updateExistingRecord(jr.getRecordID(),jr.getStatus(),jr.getCheckTimeValue(),
- 0L,currentTime,carrydownChangesSeen[z],reorderedDocumentPriorities[z],reorderedDocumentPrerequisites[z]);
- z++;
+ 0L,currentTime,carrydownChangesSeen[z] || (hopcountChangesSeen!=null && hopcountChangesSeen[z]),
+ reorderedDocumentPriorities[z],reorderedDocumentPrerequisites[z]);
+ // Signal if we need to perform the flip
+ if (hopcountChangesSeen != null && hopcountChangesSeen[z])
+ reactivateRemovedHopcountRecords = true;
+ }
}
- if (parentIdentifierHash != null && relationshipType != null)
- hopCount.recordReferences(jobID,legalLinkTypes,parentIdentifierHash,reorderedDocIDHashes,relationshipType,hopcountMethod);
+ if (reactivateRemovedHopcountRecords)
+ jobQueue.reactivateHopcountRemovedRecords(jobID);
database.performCommit();
@@ -5128,10 +5364,13 @@ public class JobManager implements IJobM
if (legalLinkTypes.length > 0)
{
ArrayList list = new ArrayList();
- list.add(jobQueue.statusToString(jobQueue.STATUS_PENDING));
+ String query = database.buildConjunctionClause(list,new ClauseDescription[]{
+ new MultiClause("t99."+jobQueue.statusField,new Object[]{
+ jobQueue.statusToString(jobQueue.STATUS_PENDING),
+ jobQueue.statusToString(jobQueue.STATUS_HOPCOUNTREMOVED)})});
hopCount.deleteMatchingDocuments(jobID,legalLinkTypes,jobQueue.getTableName()+" t99",
"t99."+jobQueue.docHashField,"t99."+jobQueue.jobIDField,
- "t99."+jobQueue.statusField+"=?",list,
+ query,list,
hopcountMethod);
}
@@ -6557,7 +6796,6 @@ public class JobManager implements IJobM
IJobDescription jobDesc = jobs.load(jobID,true);
resetJobs.add(jobDesc);
- // Label the job "finished"
jobs.finishJob(jobID,currentTime);
if (Logging.jobs.isDebugEnabled())
{
@@ -6567,7 +6805,7 @@ public class JobManager implements IJobM
}
}
-
+
// Status reports
/** Get the status of a job.
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Mon Aug 13 00:44:05 2012
@@ -68,7 +68,7 @@ public class JobQueue extends org.apache
public final static int STATUS_ACTIVENEEDRESCANPURGATORY = 8;
public final static int STATUS_BEINGCLEANED = 9;
public final static int STATUS_ELIGIBLEFORDELETE = 10;
-
+ public final static int STATUS_HOPCOUNTREMOVED = 11;
// Action values
public final static int ACTION_RESCAN = 0;
public final static int ACTION_REMOVE = 1;
@@ -126,6 +126,7 @@ public class JobQueue extends org.apache
statusMap.put("a",new Integer(STATUS_ACTIVENEEDRESCAN));
statusMap.put("f",new Integer(STATUS_ACTIVENEEDRESCANPURGATORY));
statusMap.put("d",new Integer(STATUS_BEINGCLEANED));
+ statusMap.put("H",new Integer(STATUS_HOPCOUNTREMOVED));
}
protected static Map seedstatusMap;
@@ -364,6 +365,35 @@ public class JobQueue extends org.apache
unconditionallyAnalyzeTables();
}
+ /** Flip all records for a job that have status HOPCOUNTREMOVED back to PENDING.
+ * NOTE: We need to actually schedule these!!! so the following can't really work. ???
+ */
+ public void reactivateHopcountRemovedRecords(Long jobID)
+ throws ManifoldCFException
+ {
+ Map map = new HashMap();
+ // Map HOPCOUNTREMOVED to PENDING
+ map.put(statusField,statusToString(STATUS_PENDING));
+ map.put(checkTimeField,new Long(0L));
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(jobIDField,jobID),
+ new UnitaryClause(statusField,statusToString(STATUS_HOPCOUNTREMOVED))});
+ performUpdate(map,"WHERE "+query,list,null);
+ }
+
+ /** Delete all records for a job that have status HOPCOUNTREMOVED.
+ */
+ public void deleteHopcountRemovedRecords(Long jobID)
+ throws ManifoldCFException
+ {
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(jobIDField,jobID),
+ new UnitaryClause(statusField,statusToString(STATUS_HOPCOUNTREMOVED))});
+ performDelete("WHERE "+query,list,null);
+ }
+
/** Clear the failtimes for all documents associated with a job.
* This method is called when the system detects that a significant delaying event has occurred,
* and therefore the "failure clock" needs to be reset.
@@ -451,14 +481,19 @@ public class JobQueue extends org.apache
{
// Delete PENDING entries
ArrayList list = new ArrayList();
- list.add(jobID);
- list.add(statusToString(STATUS_PENDING));
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause("t0."+jobIDField,jobID),
+ new MultiClause("t0."+statusField,new Object[]{
+ statusToString(STATUS_PENDING),
+ statusToString(STATUS_HOPCOUNTREMOVED)})});
// Clean out prereqevents table first
- prereqEventManager.deleteRows(getTableName()+" t0","t0."+idField,"t0."+jobIDField+"=? AND t0."+statusField+"=?",list);
+ prereqEventManager.deleteRows(getTableName()+" t0","t0."+idField,query,list);
list.clear();
- String query = buildConjunctionClause(list,new ClauseDescription[]{
+ query = buildConjunctionClause(list,new ClauseDescription[]{
new UnitaryClause(jobIDField,jobID),
- new UnitaryClause(statusField,statusToString(STATUS_PENDING))});
+ new MultiClause(statusField,new Object[]{
+ statusToString(STATUS_PENDING),
+ statusToString(STATUS_HOPCOUNTREMOVED)})});
performDelete("WHERE "+query,list,null);
// Turn PENDINGPURGATORY, PURGATORY, COMPLETED into ELIGIBLEFORDELETE.
@@ -687,6 +722,96 @@ public class JobQueue extends org.apache
performUpdate(map,"WHERE "+query,list,null);
}
+ /** Either delete a record, or set status to "rescan", depending on the
+ * record's state.
+ */
+ public boolean updateOrDeleteRecord(Long recID, int currentStatus)
+ throws ManifoldCFException
+ {
+ HashMap map = new HashMap();
+
+ int newStatus;
+ String actionFieldValue;
+ Long checkTimeValue;
+
+ switch (currentStatus)
+ {
+ case STATUS_ACTIVE:
+ case STATUS_ACTIVEPURGATORY:
+ // Delete it
+ deleteRecord(recID);
+ return true;
+ case STATUS_ACTIVENEEDRESCAN:
+ case STATUS_ACTIVENEEDRESCANPURGATORY:
+ newStatus = STATUS_PENDINGPURGATORY;
+ actionFieldValue = actionToString(ACTION_RESCAN);
+ checkTimeValue = new Long(0L);
+ // Leave doc priority unchanged.
+ break;
+ default:
+ throw new ManifoldCFException("Unexpected jobqueue status - record id "+recID.toString()+", expecting active status, saw "+Integer.toString(currentStatus));
+ }
+
+ map.put(statusField,statusToString(newStatus));
+ map.put(checkTimeField,checkTimeValue);
+ map.put(checkActionField,actionFieldValue);
+ map.put(failTimeField,null);
+ map.put(failCountField,null);
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(idField,recID)});
+ performUpdate(map,"WHERE "+query,list,null);
+ return false;
+ }
+
+ /** Either mark a record as hopcountremoved, or set status to "rescan", depending on the
+ * record's state.
+ */
+ public boolean updateOrHopcountRemoveRecord(Long recID, int currentStatus)
+ throws ManifoldCFException
+ {
+ HashMap map = new HashMap();
+
+ int newStatus;
+ String actionFieldValue;
+ Long checkTimeValue;
+
+ boolean rval;
+
+ switch (currentStatus)
+ {
+ case STATUS_ACTIVE:
+ case STATUS_ACTIVEPURGATORY:
+ // Mark as hopcountremove
+ newStatus = STATUS_HOPCOUNTREMOVED;
+ actionFieldValue = actionToString(ACTION_RESCAN);
+ checkTimeValue = new Long(0L);
+ rval = true;
+ break;
+ case STATUS_ACTIVENEEDRESCAN:
+ case STATUS_ACTIVENEEDRESCANPURGATORY:
+ newStatus = STATUS_PENDINGPURGATORY;
+ actionFieldValue = actionToString(ACTION_RESCAN);
+ checkTimeValue = new Long(0L);
+ rval = false;
+ // Leave doc priority unchanged.
+ break;
+ default:
+ throw new ManifoldCFException("Unexpected jobqueue status - record id "+recID.toString()+", expecting active status, saw "+Integer.toString(currentStatus));
+ }
+
+ map.put(statusField,statusToString(newStatus));
+ map.put(checkTimeField,checkTimeValue);
+ map.put(checkActionField,actionFieldValue);
+ map.put(failTimeField,null);
+ map.put(failCountField,null);
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(idField,recID)});
+ performUpdate(map,"WHERE "+query,list,null);
+ return rval;
+ }
+
/** Set the status to active on a record, leaving alone priority or check time.
*@param id is the job queue id.
*@param currentStatus is the current status
@@ -1159,10 +1284,11 @@ public class JobQueue extends org.apache
/** Update an existing record (as the result of a reference add).
* The record is presumed to exist and have been locked, via "FOR UPDATE".
+ *@return true if the document priority slot has been retained, false if freed.
*/
public boolean updateExistingRecord(Long recordID, int currentStatus, Long checkTimeValue,
- long desiredExecuteTime, long currentTime, boolean otherChangesSeen, double desiredPriority,
- String[] prereqEvents)
+ long desiredExecuteTime, long currentTime, boolean otherChangesSeen,
+ double desiredPriority, String[] prereqEvents)
throws ManifoldCFException
{
boolean rval = false;
@@ -1414,6 +1540,8 @@ public class JobQueue extends org.apache
return "f";
case STATUS_BEINGCLEANED:
return "d";
+ case STATUS_HOPCOUNTREMOVED:
+ return "H";
default:
throw new ManifoldCFException("Bad status value: "+Integer.toString(status));
}
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/DocumentCleanupThread.java Mon Aug 13 00:44:05 2012
@@ -232,7 +232,7 @@ public class DocumentCleanupThread exten
Long jobID = ddd.getJobID();
int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
- DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
+ DocumentDescription[] requeueCandidates = jobManager.markDocumentCleanedUp(jobID,legalLinkTypes,ddd,hopcountMethod);
// Use the common method for doing the requeuing
ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
connector,connection,queueTracker,currentTime);
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java?rev=1372225&r1=1372224&r2=1372225&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/system/ExpireThread.java Mon Aug 13 00:44:05 2012
@@ -235,7 +235,7 @@ public class ExpireThread extends Thread
Long jobID = ddd.getJobID();
int hopcountMethod = ((Integer)hopcountMethods.get(k)).intValue();
String[] legalLinkTypes = (String[])arrayRelationshipTypes.get(k);
- DocumentDescription[] requeueCandidates = jobManager.markDocumentDeleted(jobID,legalLinkTypes,ddd,hopcountMethod);
+ DocumentDescription[] requeueCandidates = jobManager.markDocumentExpired(jobID,legalLinkTypes,ddd,hopcountMethod);
// Use the common method for doing the requeuing
ManifoldCF.requeueDocumentsDueToCarrydown(jobManager,requeueCandidates,
connector,connection,queueTracker,currentTime);