You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/12/12 03:28:56 UTC

svn commit: r1420512 - /manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java

Author: kwright
Date: Wed Dec 12 02:28:55 2012
New Revision: 1420512

URL: http://svn.apache.org/viewvc?rev=1420512&view=rev
Log:
Loop on job start, because MySQL has lock timeouts with this query a lot

Modified:
    manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java

Modified: manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1420512&r1=1420511&r2=1420512&view=diff
==============================================================================
--- manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/branches/CONNECTORS-584/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Wed Dec 12 02:28:55 2012
@@ -4684,223 +4684,237 @@ public class JobManager implements IJobM
     // Note well: We can't combine locks across both our lock manager and the database unless we do it consistently.  The
     // consistent practice throughout CF is to do the external locks first, then the database locks.  This particular method
     // thus cannot use cached job description information, because it must throw database locks first against the jobs table.
-    database.beginTransaction();
-    try
+    while (true)
     {
-      // First, query the appropriate fields of all jobs.
-      StringBuilder sb = new StringBuilder("SELECT ");
-      ArrayList list = new ArrayList();
-      
-      sb.append(jobs.idField).append(",")
-        .append(jobs.lastTimeField).append(",")
-        .append(jobs.statusField).append(",")
-        .append(jobs.startMethodField).append(",")
-        .append(jobs.outputNameField).append(",")
-        .append(jobs.connectionNameField)
-        .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
-        .append(database.buildConjunctionClause(list,new ClauseDescription[]{
-          new MultiClause(jobs.statusField,new Object[]{
-            jobs.statusToString(jobs.STATUS_INACTIVE),
-            jobs.statusToString(jobs.STATUS_ACTIVEWAIT),
-            jobs.statusToString(jobs.STATUS_ACTIVEWAITSEEDING),
-            jobs.statusToString(jobs.STATUS_PAUSEDWAIT),
-            jobs.statusToString(jobs.STATUS_PAUSEDWAITSEEDING)})})).append(" AND ")
-        .append(jobs.startMethodField).append("!=? FOR UPDATE");
-      
-      list.add(jobs.startMethodToString(IJobDescription.START_DISABLE));
-      
-      IResultSet set = database.performQuery(sb.toString(),list,null,null);
-
-      // Next, we query for the schedule information.  In order to do that, we amass a list of job identifiers that we want schedule info
-      // for.
-      Long[] jobIDSet = new Long[set.getRowCount()];
-      int i = 0;
-      while (i < set.getRowCount())
-      {
-        IResultRow row = set.getRow(i);
-        jobIDSet[i++] = (Long)row.getValue(jobs.idField);
-      }
-
-      ScheduleRecord[][] srSet = jobs.readScheduleRecords(jobIDSet);
-
-      i = 0;
-      while (i < set.getRowCount())
+      long sleepAmt = 0L;
+      database.beginTransaction();
+      try
       {
-        IResultRow row = set.getRow(i);
-
-        Long jobID = (Long)row.getValue(jobs.idField);
-        int startMethod = jobs.stringToStartMethod((String)row.getValue(jobs.startMethodField));
-        String outputName = (String)row.getValue(jobs.outputNameField);
-        String connectionName = (String)row.getValue(jobs.connectionNameField);
-        ScheduleRecord[] thisSchedule = srSet[i++];
-
-        // Run at specific times
+        // First, query the appropriate fields of all jobs.
+        StringBuilder sb = new StringBuilder("SELECT ");
+        ArrayList list = new ArrayList();
+        
+        sb.append(jobs.idField).append(",")
+          .append(jobs.lastTimeField).append(",")
+          .append(jobs.statusField).append(",")
+          .append(jobs.startMethodField).append(",")
+          .append(jobs.outputNameField).append(",")
+          .append(jobs.connectionNameField)
+          .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+          .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+            new MultiClause(jobs.statusField,new Object[]{
+              jobs.statusToString(jobs.STATUS_INACTIVE),
+              jobs.statusToString(jobs.STATUS_ACTIVEWAIT),
+              jobs.statusToString(jobs.STATUS_ACTIVEWAITSEEDING),
+              jobs.statusToString(jobs.STATUS_PAUSEDWAIT),
+              jobs.statusToString(jobs.STATUS_PAUSEDWAITSEEDING)})})).append(" AND ")
+          .append(jobs.startMethodField).append("!=? FOR UPDATE");
+        
+        list.add(jobs.startMethodToString(IJobDescription.START_DISABLE));
+        
+        IResultSet set = database.performQuery(sb.toString(),list,null,null);
 
-        // We need to start with the start time as given, plus one
-        long startInterval = ((Long)row.getValue(jobs.lastTimeField)).longValue() + 1;
-        if (Logging.jobs.isDebugEnabled())
-          Logging.jobs.debug("Checking if job "+jobID.toString()+" needs to be started; it was last checked at "+
-          new Long(startInterval).toString()+", and now it is "+new Long(currentTime).toString());
+        // Next, we query for the schedule information.  In order to do that, we amass a list of job identifiers that we want schedule info
+        // for.
+        Long[] jobIDSet = new Long[set.getRowCount()];
+        int i = 0;
+        while (i < set.getRowCount())
+        {
+          IResultRow row = set.getRow(i);
+          jobIDSet[i++] = (Long)row.getValue(jobs.idField);
+        }
 
-        // Proceed to the current time, and find a match if there is one to be found.
-        // If not -> continue
+        ScheduleRecord[][] srSet = jobs.readScheduleRecords(jobIDSet);
 
-        // We go through *all* the schedule records.  The one that matches that has the latest
-        // end time is the one we take.
-        int l = 0;
-        Long matchTime = null;
-        Long duration = null;
-        while (l < thisSchedule.length)
-        {
-          long trialStartInterval = startInterval;
-          ScheduleRecord sr = thisSchedule[l++];
-          Long thisDuration = sr.getDuration();
-          if (startMethod == IJobDescription.START_WINDOWINSIDE &&
-            thisDuration != null)
-          {
-            // Bump the start interval back before the beginning of the current interval.
-            // This will guarantee a start as long as there is time in the window.
-            long trialStart = currentTime - thisDuration.longValue();
-            if (trialStart < trialStartInterval)
-              trialStartInterval = trialStart;
-          }
+        i = 0;
+        while (i < set.getRowCount())
+        {
+          IResultRow row = set.getRow(i);
 
-          Long thisMatchTime = checkTimeMatch(trialStartInterval,currentTime,
-            sr.getDayOfWeek(),
-            sr.getDayOfMonth(),
-            sr.getMonthOfYear(),
-            sr.getYear(),
-            sr.getHourOfDay(),
-            sr.getMinutesOfHour(),
-            sr.getTimezone(),
-            thisDuration);
+          Long jobID = (Long)row.getValue(jobs.idField);
+          int startMethod = jobs.stringToStartMethod((String)row.getValue(jobs.startMethodField));
+          String outputName = (String)row.getValue(jobs.outputNameField);
+          String connectionName = (String)row.getValue(jobs.connectionNameField);
+          ScheduleRecord[] thisSchedule = srSet[i++];
+
+          // Run at specific times
+
+          // We need to start with the start time as given, plus one
+          long startInterval = ((Long)row.getValue(jobs.lastTimeField)).longValue() + 1;
+          if (Logging.jobs.isDebugEnabled())
+            Logging.jobs.debug("Checking if job "+jobID.toString()+" needs to be started; it was last checked at "+
+            new Long(startInterval).toString()+", and now it is "+new Long(currentTime).toString());
+
+          // Proceed to the current time, and find a match if there is one to be found.
+          // If not -> continue
+
+          // We go through *all* the schedule records.  The one that matches that has the latest
+          // end time is the one we take.
+          int l = 0;
+          Long matchTime = null;
+          Long duration = null;
+          while (l < thisSchedule.length)
+          {
+            long trialStartInterval = startInterval;
+            ScheduleRecord sr = thisSchedule[l++];
+            Long thisDuration = sr.getDuration();
+            if (startMethod == IJobDescription.START_WINDOWINSIDE &&
+              thisDuration != null)
+            {
+              // Bump the start interval back before the beginning of the current interval.
+              // This will guarantee a start as long as there is time in the window.
+              long trialStart = currentTime - thisDuration.longValue();
+              if (trialStart < trialStartInterval)
+                trialStartInterval = trialStart;
+            }
+
+            Long thisMatchTime = checkTimeMatch(trialStartInterval,currentTime,
+              sr.getDayOfWeek(),
+              sr.getDayOfMonth(),
+              sr.getMonthOfYear(),
+              sr.getYear(),
+              sr.getHourOfDay(),
+              sr.getMinutesOfHour(),
+              sr.getTimezone(),
+              thisDuration);
+
+            if (thisMatchTime == null)
+            {
+              if (Logging.jobs.isDebugEnabled())
+                Logging.jobs.debug(" No time match found within interval "+new Long(trialStartInterval).toString()+
+                " to "+new Long(currentTime).toString());
+              continue;
+            }
 
-          if (thisMatchTime == null)
-          {
             if (Logging.jobs.isDebugEnabled())
-              Logging.jobs.debug(" No time match found within interval "+new Long(trialStartInterval).toString()+
+              Logging.jobs.debug(" Time match FOUND within interval "+new Long(trialStartInterval).toString()+
               " to "+new Long(currentTime).toString());
-            continue;
-          }
 
-          if (Logging.jobs.isDebugEnabled())
-            Logging.jobs.debug(" Time match FOUND within interval "+new Long(trialStartInterval).toString()+
-            " to "+new Long(currentTime).toString());
+            if (matchTime == null || thisDuration == null ||
+              (duration != null && thisMatchTime.longValue() + thisDuration.longValue() >
+            matchTime.longValue() + duration.longValue()))
+            {
+              matchTime = thisMatchTime;
+              duration = thisDuration;
+            }
+          }
 
-          if (matchTime == null || thisDuration == null ||
-            (duration != null && thisMatchTime.longValue() + thisDuration.longValue() >
-          matchTime.longValue() + duration.longValue()))
+          if (matchTime == null)
           {
-            matchTime = thisMatchTime;
-            duration = thisDuration;
+            jobs.updateLastTime(jobID,currentTime);
+            continue;
           }
-        }
-
-        if (matchTime == null)
-        {
-          jobs.updateLastTime(jobID,currentTime);
-          continue;
-        }
-
-        int status = jobs.stringToStatus(row.getValue(jobs.statusField).toString());
 
+          int status = jobs.stringToStatus(row.getValue(jobs.statusField).toString());
 
-        // Calculate the end of the window
-        Long windowEnd = null;
-        if (duration != null)
-        {
-          windowEnd = new Long(matchTime.longValue()+duration.longValue());
-        }
 
-        if (Logging.jobs.isDebugEnabled())
-        {
-          Logging.jobs.debug("Job '"+jobID+"' is within run window at "+new Long(currentTime).toString()+" ms. (which starts at "+
-            matchTime.toString()+" ms."+((duration==null)?"":(" and goes for "+duration.toString()+" ms."))+")");
-        }
-
-        int newJobState;
-        switch (status)
-        {
-        case Jobs.STATUS_INACTIVE:
-          // If job was formerly "inactive", do the full startup.
-          // Start this job!  but with no end time.
-          // This does not get logged because the startup thread does the logging.
-          jobs.startJob(jobID,windowEnd);
-          jobQueue.clearFailTimes(jobID);
-          if (Logging.jobs.isDebugEnabled())
+          // Calculate the end of the window
+          Long windowEnd = null;
+          if (duration != null)
           {
-            Logging.jobs.debug("Signalled for job start for job "+jobID);
+            windowEnd = new Long(matchTime.longValue()+duration.longValue());
           }
-          break;
-        case Jobs.STATUS_ACTIVEWAIT:
-          unwaitList.add(jobID);
-          jobs.unwaitJob(jobID,Jobs.STATUS_RESUMING,windowEnd);
-          jobQueue.clearFailTimes(jobID);
-          if (Logging.jobs.isDebugEnabled())
-          {
-            Logging.jobs.debug("Un-waited job "+jobID);
-          }
-          break;
-        case Jobs.STATUS_ACTIVEWAITSEEDING:
-          unwaitList.add(jobID);
-          jobs.unwaitJob(jobID,Jobs.STATUS_RESUMINGSEEDING,windowEnd);
-          jobQueue.clearFailTimes(jobID);
-          if (Logging.jobs.isDebugEnabled())
-          {
-            Logging.jobs.debug("Un-waited job "+jobID);
-          }
-          break;
-        case Jobs.STATUS_PAUSEDWAIT:
-          unwaitList.add(jobID);
-          jobs.unwaitJob(jobID,jobs.STATUS_PAUSED,windowEnd);
-          if (Logging.jobs.isDebugEnabled())
-          {
-            Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
-          }
-          break;
-        case Jobs.STATUS_PAUSEDWAITSEEDING:
-          unwaitList.add(jobID);
-          jobs.unwaitJob(jobID,jobs.STATUS_PAUSEDSEEDING,windowEnd);
-          if (Logging.jobs.isDebugEnabled())
-          {
-            Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
-          }
-          break;
-        case Jobs.STATUS_PAUSINGWAITING:
-          unwaitList.add(jobID);
-          jobs.unwaitJob(jobID,jobs.STATUS_PAUSING,windowEnd);
+
           if (Logging.jobs.isDebugEnabled())
           {
-            Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+            Logging.jobs.debug("Job '"+jobID+"' is within run window at "+new Long(currentTime).toString()+" ms. (which starts at "+
+              matchTime.toString()+" ms."+((duration==null)?"":(" and goes for "+duration.toString()+" ms."))+")");
           }
-          break;
-        case Jobs.STATUS_PAUSINGWAITINGSEEDING:
-          unwaitList.add(jobID);
-          jobs.unwaitJob(jobID,jobs.STATUS_PAUSINGSEEDING,windowEnd);
-          if (Logging.jobs.isDebugEnabled())
+
+          int newJobState;
+          switch (status)
           {
-            Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+          case Jobs.STATUS_INACTIVE:
+            // If job was formerly "inactive", do the full startup.
+            // Start this job!  but with no end time.
+            // This does not get logged because the startup thread does the logging.
+            jobs.startJob(jobID,windowEnd);
+            jobQueue.clearFailTimes(jobID);
+            if (Logging.jobs.isDebugEnabled())
+            {
+              Logging.jobs.debug("Signalled for job start for job "+jobID);
+            }
+            break;
+          case Jobs.STATUS_ACTIVEWAIT:
+            unwaitList.add(jobID);
+            jobs.unwaitJob(jobID,Jobs.STATUS_RESUMING,windowEnd);
+            jobQueue.clearFailTimes(jobID);
+            if (Logging.jobs.isDebugEnabled())
+            {
+              Logging.jobs.debug("Un-waited job "+jobID);
+            }
+            break;
+          case Jobs.STATUS_ACTIVEWAITSEEDING:
+            unwaitList.add(jobID);
+            jobs.unwaitJob(jobID,Jobs.STATUS_RESUMINGSEEDING,windowEnd);
+            jobQueue.clearFailTimes(jobID);
+            if (Logging.jobs.isDebugEnabled())
+            {
+              Logging.jobs.debug("Un-waited job "+jobID);
+            }
+            break;
+          case Jobs.STATUS_PAUSEDWAIT:
+            unwaitList.add(jobID);
+            jobs.unwaitJob(jobID,jobs.STATUS_PAUSED,windowEnd);
+            if (Logging.jobs.isDebugEnabled())
+            {
+              Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+            }
+            break;
+          case Jobs.STATUS_PAUSEDWAITSEEDING:
+            unwaitList.add(jobID);
+            jobs.unwaitJob(jobID,jobs.STATUS_PAUSEDSEEDING,windowEnd);
+            if (Logging.jobs.isDebugEnabled())
+            {
+              Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+            }
+            break;
+          case Jobs.STATUS_PAUSINGWAITING:
+            unwaitList.add(jobID);
+            jobs.unwaitJob(jobID,jobs.STATUS_PAUSING,windowEnd);
+            if (Logging.jobs.isDebugEnabled())
+            {
+              Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+            }
+            break;
+          case Jobs.STATUS_PAUSINGWAITINGSEEDING:
+            unwaitList.add(jobID);
+            jobs.unwaitJob(jobID,jobs.STATUS_PAUSINGSEEDING,windowEnd);
+            if (Logging.jobs.isDebugEnabled())
+            {
+              Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+            }
+            break;
+          default:
+            break;
           }
-          break;
-        default:
-          break;
-        }
 
+        }
+        database.performCommit();
+        return;
+      }
+      catch (ManifoldCFException e)
+      {
+        database.signalRollback();
+        if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+        {
+          if (Logging.perf.isDebugEnabled())
+            Logging.perf.debug("Aborted transaction resetting for restart: "+e.getMessage());
+          sleepAmt = getRandomAmount();
+          continue;
+        }
+        throw e;
+      }
+      catch (Error e)
+      {
+        database.signalRollback();
+        throw e;
+      }
+      finally
+      {
+        database.endTransaction();
+        sleepFor(sleepAmt);
       }
-    }
-    catch (ManifoldCFException e)
-    {
-      database.signalRollback();
-      throw e;
-    }
-    catch (Error e)
-    {
-      database.signalRollback();
-      throw e;
-    }
-    finally
-    {
-      database.endTransaction();
     }
   }