You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/08/08 03:02:52 UTC

svn commit: r1370620 - in /manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs: JobManager.java JobQueue.java

Author: kwright
Date: Wed Aug  8 01:02:52 2012
New Revision: 1370620

URL: http://svn.apache.org/viewvc?rev=1370620&view=rev
Log:
Code that hopefully implements the conditional deletion needed to fix this ticket.

Modified:
    manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
    manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java

Modified: manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1370620&r1=1370619&r2=1370620&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Wed Aug  8 01:02:52 2012
@@ -2488,8 +2488,115 @@ public class JobManager implements IJobM
     int hopcountMethod)
     throws ManifoldCFException
   {
-    // MHL
-    return doDeleteMultiple(jobID,legalLinkTypes,documentDescriptions,hopcountMethod);
+    // For each record, we're going to have to choose between deleting it, and marking it for rescan.  So the
+    // basic flow may involve changing a document's status, or blowing it away completely.
+    
+    // Before we can change a document status, we need to know the *current* status.  Therefore, a SELECT xxx FOR UPDATE/UPDATE
+    // transaction is needed in order to complete these documents correctly.
+    //
+    // Since we are therefore setting row locks on thejobqueue table, we need to work to avoid unnecessary deadlocking.  To do that, we have to
+    // lock rows in document id hash order!!  Luckily, the DocumentDescription objects have a document identifier buried within, which we can use to
+    // order the "select for update" operations appropriately.
+    //
+
+    HashMap indexMap = new HashMap();
+    String[] docIDHashes = new String[documentDescriptions.length];
+
+    int i = 0;
+    while (i < documentDescriptions.length)
+    {
+      String documentIDHash = documentDescriptions[i].getDocumentIdentifierHash() + ":" + documentDescriptions[i].getJobID();
+      docIDHashes[i] = documentIDHash;
+      indexMap.put(documentIDHash,new Integer(i));
+      i++;
+    }
+
+    java.util.Arrays.sort(docIDHashes);
+
+    // Retry loop - in case we get a deadlock despite our best efforts
+    while (true)
+    {
+      long sleepAmt = 0L;
+      database.beginTransaction(database.TRANSACTION_SERIALIZED);
+
+      // Start the transaction now
+      database.beginTransaction();
+      try
+      {
+        // Do one row at a time, to avoid deadlocking things
+        List<String> deleteList = new ArrayList<String>();
+        
+        i = 0;
+        while (i < docIDHashes.length)
+        {
+          String docIDHash = docIDHashes[i];
+
+          // Get the DocumentDescription object
+          DocumentDescription dd = documentDescriptions[((Integer)indexMap.get(docIDHash)).intValue()];
+
+          // Query for the status
+          ArrayList list = new ArrayList();
+          String query = database.buildConjunctionClause(list,new ClauseDescription[]{
+            new UnitaryClause(jobQueue.idField,dd.getID())});
+          IResultSet set = database.performQuery("SELECT "+jobQueue.statusField+" FROM "+jobQueue.getTableName()+" WHERE "+
+            query+" FOR UPDATE",list,null,null);
+          if (set.getRowCount() > 0)
+          {
+            IResultRow row = set.getRow(0);
+            // Grab the status
+            int status = jobQueue.stringToStatus((String)row.getValue(jobQueue.statusField));
+            // Update the jobqueue table
+            boolean didDelete = jobQueue.updateOrDeleteRecord(dd.getID(),status);
+            if (didDelete)
+            {
+              deleteList.add(dd.getDocumentIdentifierHash());
+            }
+          }
+          i++;
+        }
+        
+        String[] docIDSimpleHashes = new String[deleteList.size()];
+        for (int j = 0; j < docIDSimpleHashes.length; j++)
+        {
+          docIDSimpleHashes[j] = deleteList.get(j);
+        }
+        
+        // Next, find the documents that are affected by carrydown deletion.
+        DocumentDescription[] rval = calculateAffectedDeleteCarrydownChildren(jobID,docIDSimpleHashes);
+
+        // Finally, delete the carrydown records in question.
+        carryDown.deleteRecords(jobID,docIDSimpleHashes);
+        if (legalLinkTypes.length > 0)
+          hopCount.deleteDocumentIdentifiers(jobID,legalLinkTypes,docIDSimpleHashes,hopcountMethod);
+
+        database.performCommit();
+        return rval;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction marking completed "+Integer.toString(docIDHashes.length)+
+            " docs: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
+      }
+    }
+
   }
 
   /** Delete from queue as a result of processing of an active document.

Modified: manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java?rev=1370620&r1=1370619&r2=1370620&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java (original)
+++ manifoldcf/branches/CONNECTORS-501/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobQueue.java Wed Aug  8 01:02:52 2012
@@ -687,6 +687,48 @@ public class JobQueue extends org.apache
     performUpdate(map,"WHERE "+query,list,null);
   }
 
+  /** Either delete a record, or set status to "rescan", depending on the
+  * record's state.
+  */
+  public boolean updateOrDeleteRecord(Long recID, int currentStatus)
+    throws ManifoldCFException
+  {
+    HashMap map = new HashMap();
+    
+    int newStatus;
+    String actionFieldValue;
+    Long checkTimeValue;
+    
+    switch (currentStatus)
+    {
+    case STATUS_ACTIVE:
+    case STATUS_ACTIVEPURGATORY:
+      // Delete it
+      deleteRecord(recID);
+      return true;
+    case STATUS_ACTIVENEEDRESCAN:
+    case STATUS_ACTIVENEEDRESCANPURGATORY:
+      newStatus = STATUS_PENDINGPURGATORY;
+      actionFieldValue = actionToString(ACTION_RESCAN);
+      checkTimeValue = new Long(0L);
+      // Leave doc priority unchanged.
+      break;
+    default:
+      throw new ManifoldCFException("Unexpected jobqueue status - record id "+recID.toString()+", expecting active status, saw "+Integer.toString(currentStatus));
+    }
+
+    map.put(statusField,statusToString(newStatus));
+    map.put(checkTimeField,checkTimeValue);
+    map.put(checkActionField,actionFieldValue);
+    map.put(failTimeField,null);
+    map.put(failCountField,null);
+    ArrayList list = new ArrayList();
+    String query = buildConjunctionClause(list,new ClauseDescription[]{
+      new UnitaryClause(idField,recID)});
+    performUpdate(map,"WHERE "+query,list,null);
+    return false;
+  }
+
   /** Set the status to active on a record, leaving alone priority or check time.
   *@param id is the job queue id.
   *@param currentStatus is the current status