You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/08/08 03:02:52 UTC
svn commit: r1370620 - in
/manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs:
JobManager.java JobQueue.java
Author: kwright
Date: Wed Aug 8 01:02:52 2012
New Revision: 1370620
URL: http://svn.apache.org/viewvc?rev=1370620&view=rev
Log:
Code that hopefully implements the conditional deletion needed to fix this ticket.
Modified:
manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
Modified: manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1370620&r1=1370619&r2=1370620&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Wed Aug 8 01:02:52 2012
@@ -2488,8 +2488,115 @@ public class JobManager implements IJobM
int hopcountMethod)
throws ManifoldCFException
{
- // MHL
- return doDeleteMultiple(jobID,legalLinkTypes,documentDescriptions,hopcountMethod);
+ // For each record, we're going to have to choose between deleting it, and marking it for rescan. So the
+ // basic flow may involve changing a document's status, or blowing it away completely.
+
+ // Before we can change a document status, we need to know the *current* status. Therefore, a SELECT xxx FOR UPDATE/UPDATE
+ // transaction is needed in order to complete these documents correctly.
+ //
+ // Since we are therefore setting row locks on thejobqueue table, we need to work to avoid unnecessary deadlocking. To do that, we have to
+ // lock rows in document id hash order!! Luckily, the DocumentDescription objects have a document identifier buried within, which we can use to
+ // order the "select for update" operations appropriately.
+ //
+
+ HashMap indexMap = new HashMap();
+ String[] docIDHashes = new String[documentDescriptions.length];
+
+ int i = 0;
+ while (i < documentDescriptions.length)
+ {
+ String documentIDHash = documentDescriptions[i].getDocumentIdentifierHash() + ":" + documentDescriptions[i].getJobID();
+ docIDHashes[i] = documentIDHash;
+ indexMap.put(documentIDHash,new Integer(i));
+ i++;
+ }
+
+ java.util.Arrays.sort(docIDHashes);
+
+ // Retry loop - in case we get a deadlock despite our best efforts
+ while (true)
+ {
+ long sleepAmt = 0L;
+ database.beginTransaction(database.TRANSACTION_SERIALIZED);
+
+ // Start the transaction now
+ database.beginTransaction();
+ try
+ {
+ // Do one row at a time, to avoid deadlocking things
+ List<String> deleteList = new ArrayList<String>();
+
+ i = 0;
+ while (i < docIDHashes.length)
+ {
+ String docIDHash = docIDHashes[i];
+
+ // Get the DocumentDescription object
+ DocumentDescription dd = documentDescriptions[((Integer)indexMap.get(docIDHash)).intValue()];
+
+ // Query for the status
+ ArrayList list = new ArrayList();
+ String query = database.buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(jobQueue.idField,dd.getID())});
+ IResultSet set = database.performQuery("SELECT "+jobQueue.statusField+" FROM "+jobQueue.getTableName()+" WHERE "+
+ query+" FOR UPDATE",list,null,null);
+ if (set.getRowCount() > 0)
+ {
+ IResultRow row = set.getRow(0);
+ // Grab the status
+ int status = jobQueue.stringToStatus((String)row.getValue(jobQueue.statusField));
+ // Update the jobqueue table
+ boolean didDelete = jobQueue.updateOrDeleteRecord(dd.getID(),status);
+ if (didDelete)
+ {
+ deleteList.add(dd.getDocumentIdentifierHash());
+ }
+ }
+ i++;
+ }
+
+ String[] docIDSimpleHashes = new String[deleteList.size()];
+ for (int j = 0; j < docIDSimpleHashes.length; j++)
+ {
+ docIDSimpleHashes[j] = deleteList.get(j);
+ }
+
+ // Next, find the documents that are affected by carrydown deletion.
+ DocumentDescription[] rval = calculateAffectedDeleteCarrydownChildren(jobID,docIDSimpleHashes);
+
+ // Finally, delete the carrydown records in question.
+ carryDown.deleteRecords(jobID,docIDSimpleHashes);
+ if (legalLinkTypes.length > 0)
+ hopCount.deleteDocumentIdentifiers(jobID,legalLinkTypes,docIDSimpleHashes,hopcountMethod);
+
+ database.performCommit();
+ return rval;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted transaction marking completed "+Integer.toString(docIDHashes.length)+
+ " docs: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
+ }
+ }
+
}
/** Delete from queue as a result of processing of an active document.
Modified: manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1370620&r1=1370619&r2=1370620&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Wed Aug 8 01:02:52 2012
@@ -687,6 +687,48 @@ public class JobQueue extends org.apache
performUpdate(map,"WHERE "+query,list,null);
}
+ /** Either delete a record, or set status to "rescan", depending on the
+ * record's state.
+ */
+ public boolean updateOrDeleteRecord(Long recID, int currentStatus)
+ throws ManifoldCFException
+ {
+ HashMap map = new HashMap();
+
+ int newStatus;
+ String actionFieldValue;
+ Long checkTimeValue;
+
+ switch (currentStatus)
+ {
+ case STATUS_ACTIVE:
+ case STATUS_ACTIVEPURGATORY:
+ // Delete it
+ deleteRecord(recID);
+ return true;
+ case STATUS_ACTIVENEEDRESCAN:
+ case STATUS_ACTIVENEEDRESCANPURGATORY:
+ newStatus = STATUS_PENDINGPURGATORY;
+ actionFieldValue = actionToString(ACTION_RESCAN);
+ checkTimeValue = new Long(0L);
+ // Leave doc priority unchanged.
+ break;
+ default:
+ throw new ManifoldCFException("Unexpected jobqueue status - record id "+recID.toString()+", expecting active status, saw "+Integer.toString(currentStatus));
+ }
+
+ map.put(statusField,statusToString(newStatus));
+ map.put(checkTimeField,checkTimeValue);
+ map.put(checkActionField,actionFieldValue);
+ map.put(failTimeField,null);
+ map.put(failCountField,null);
+ ArrayList list = new ArrayList();
+ String query = buildConjunctionClause(list,new ClauseDescription[]{
+ new UnitaryClause(idField,recID)});
+ performUpdate(map,"WHERE "+query,list,null);
+ return false;
+ }
+
/** Set the status to active on a record, leaving alone priority or check time.
*@param id is the job queue id.
*@param currentStatus is the current status