You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@manifoldcf.apache.org by kw...@apache.org on 2012/12/12 03:30:37 UTC
svn commit: r1420513 - in /manifoldcf/trunk: ./
framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
Author: kwright
Date: Wed Dec 12 02:30:36 2012
New Revision: 1420513
URL: http://svn.apache.org/viewvc?rev=1420513&view=rev
Log:
Missed piece of CONNECTORS-584: loop on job start, since it lock-times-out a fair bit under mysql.
Modified:
manifoldcf/trunk/ (props changed)
manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
Propchange: manifoldcf/trunk/
------------------------------------------------------------------------------
Merged /manifoldcf/branches/CONNECTORS-584:r1420512
Modified: manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java
URL: http://svn.apache.org/viewvc/manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java?rev=1420513&r1=1420512&r2=1420513&view=diff
==============================================================================
--- manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java (original)
+++ manifoldcf/trunk/framework/pull-agent/src/main/java/org/apache/manifoldcf/crawler/jobs/JobManager.java Wed Dec 12 02:30:36 2012
@@ -4684,223 +4684,237 @@ public class JobManager implements IJobM
// Note well: We can't combine locks across both our lock manager and the database unless we do it consistently. The
// consistent practice throughout CF is to do the external locks first, then the database locks. This particular method
// thus cannot use cached job description information, because it must throw database locks first against the jobs table.
- database.beginTransaction();
- try
+ while (true)
{
- // First, query the appropriate fields of all jobs.
- StringBuilder sb = new StringBuilder("SELECT ");
- ArrayList list = new ArrayList();
-
- sb.append(jobs.idField).append(",")
- .append(jobs.lastTimeField).append(",")
- .append(jobs.statusField).append(",")
- .append(jobs.startMethodField).append(",")
- .append(jobs.outputNameField).append(",")
- .append(jobs.connectionNameField)
- .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
- .append(database.buildConjunctionClause(list,new ClauseDescription[]{
- new MultiClause(jobs.statusField,new Object[]{
- jobs.statusToString(jobs.STATUS_INACTIVE),
- jobs.statusToString(jobs.STATUS_ACTIVEWAIT),
- jobs.statusToString(jobs.STATUS_ACTIVEWAITSEEDING),
- jobs.statusToString(jobs.STATUS_PAUSEDWAIT),
- jobs.statusToString(jobs.STATUS_PAUSEDWAITSEEDING)})})).append(" AND ")
- .append(jobs.startMethodField).append("!=? FOR UPDATE");
-
- list.add(jobs.startMethodToString(IJobDescription.START_DISABLE));
-
- IResultSet set = database.performQuery(sb.toString(),list,null,null);
-
- // Next, we query for the schedule information. In order to do that, we amass a list of job identifiers that we want schedule info
- // for.
- Long[] jobIDSet = new Long[set.getRowCount()];
- int i = 0;
- while (i < set.getRowCount())
- {
- IResultRow row = set.getRow(i);
- jobIDSet[i++] = (Long)row.getValue(jobs.idField);
- }
-
- ScheduleRecord[][] srSet = jobs.readScheduleRecords(jobIDSet);
-
- i = 0;
- while (i < set.getRowCount())
+ long sleepAmt = 0L;
+ database.beginTransaction();
+ try
{
- IResultRow row = set.getRow(i);
-
- Long jobID = (Long)row.getValue(jobs.idField);
- int startMethod = jobs.stringToStartMethod((String)row.getValue(jobs.startMethodField));
- String outputName = (String)row.getValue(jobs.outputNameField);
- String connectionName = (String)row.getValue(jobs.connectionNameField);
- ScheduleRecord[] thisSchedule = srSet[i++];
-
- // Run at specific times
+ // First, query the appropriate fields of all jobs.
+ StringBuilder sb = new StringBuilder("SELECT ");
+ ArrayList list = new ArrayList();
+
+ sb.append(jobs.idField).append(",")
+ .append(jobs.lastTimeField).append(",")
+ .append(jobs.statusField).append(",")
+ .append(jobs.startMethodField).append(",")
+ .append(jobs.outputNameField).append(",")
+ .append(jobs.connectionNameField)
+ .append(" FROM ").append(jobs.getTableName()).append(" WHERE ")
+ .append(database.buildConjunctionClause(list,new ClauseDescription[]{
+ new MultiClause(jobs.statusField,new Object[]{
+ jobs.statusToString(jobs.STATUS_INACTIVE),
+ jobs.statusToString(jobs.STATUS_ACTIVEWAIT),
+ jobs.statusToString(jobs.STATUS_ACTIVEWAITSEEDING),
+ jobs.statusToString(jobs.STATUS_PAUSEDWAIT),
+ jobs.statusToString(jobs.STATUS_PAUSEDWAITSEEDING)})})).append(" AND ")
+ .append(jobs.startMethodField).append("!=? FOR UPDATE");
+
+ list.add(jobs.startMethodToString(IJobDescription.START_DISABLE));
+
+ IResultSet set = database.performQuery(sb.toString(),list,null,null);
- // We need to start with the start time as given, plus one
- long startInterval = ((Long)row.getValue(jobs.lastTimeField)).longValue() + 1;
- if (Logging.jobs.isDebugEnabled())
- Logging.jobs.debug("Checking if job "+jobID.toString()+" needs to be started; it was last checked at "+
- new Long(startInterval).toString()+", and now it is "+new Long(currentTime).toString());
+ // Next, we query for the schedule information. In order to do that, we amass a list of job identifiers that we want schedule info
+ // for.
+ Long[] jobIDSet = new Long[set.getRowCount()];
+ int i = 0;
+ while (i < set.getRowCount())
+ {
+ IResultRow row = set.getRow(i);
+ jobIDSet[i++] = (Long)row.getValue(jobs.idField);
+ }
- // Proceed to the current time, and find a match if there is one to be found.
- // If not -> continue
+ ScheduleRecord[][] srSet = jobs.readScheduleRecords(jobIDSet);
- // We go through *all* the schedule records. The one that matches that has the latest
- // end time is the one we take.
- int l = 0;
- Long matchTime = null;
- Long duration = null;
- while (l < thisSchedule.length)
- {
- long trialStartInterval = startInterval;
- ScheduleRecord sr = thisSchedule[l++];
- Long thisDuration = sr.getDuration();
- if (startMethod == IJobDescription.START_WINDOWINSIDE &&
- thisDuration != null)
- {
- // Bump the start interval back before the beginning of the current interval.
- // This will guarantee a start as long as there is time in the window.
- long trialStart = currentTime - thisDuration.longValue();
- if (trialStart < trialStartInterval)
- trialStartInterval = trialStart;
- }
+ i = 0;
+ while (i < set.getRowCount())
+ {
+ IResultRow row = set.getRow(i);
- Long thisMatchTime = checkTimeMatch(trialStartInterval,currentTime,
- sr.getDayOfWeek(),
- sr.getDayOfMonth(),
- sr.getMonthOfYear(),
- sr.getYear(),
- sr.getHourOfDay(),
- sr.getMinutesOfHour(),
- sr.getTimezone(),
- thisDuration);
+ Long jobID = (Long)row.getValue(jobs.idField);
+ int startMethod = jobs.stringToStartMethod((String)row.getValue(jobs.startMethodField));
+ String outputName = (String)row.getValue(jobs.outputNameField);
+ String connectionName = (String)row.getValue(jobs.connectionNameField);
+ ScheduleRecord[] thisSchedule = srSet[i++];
+
+ // Run at specific times
+
+ // We need to start with the start time as given, plus one
+ long startInterval = ((Long)row.getValue(jobs.lastTimeField)).longValue() + 1;
+ if (Logging.jobs.isDebugEnabled())
+ Logging.jobs.debug("Checking if job "+jobID.toString()+" needs to be started; it was last checked at "+
+ new Long(startInterval).toString()+", and now it is "+new Long(currentTime).toString());
+
+ // Proceed to the current time, and find a match if there is one to be found.
+ // If not -> continue
+
+ // We go through *all* the schedule records. The one that matches that has the latest
+ // end time is the one we take.
+ int l = 0;
+ Long matchTime = null;
+ Long duration = null;
+ while (l < thisSchedule.length)
+ {
+ long trialStartInterval = startInterval;
+ ScheduleRecord sr = thisSchedule[l++];
+ Long thisDuration = sr.getDuration();
+ if (startMethod == IJobDescription.START_WINDOWINSIDE &&
+ thisDuration != null)
+ {
+ // Bump the start interval back before the beginning of the current interval.
+ // This will guarantee a start as long as there is time in the window.
+ long trialStart = currentTime - thisDuration.longValue();
+ if (trialStart < trialStartInterval)
+ trialStartInterval = trialStart;
+ }
+
+ Long thisMatchTime = checkTimeMatch(trialStartInterval,currentTime,
+ sr.getDayOfWeek(),
+ sr.getDayOfMonth(),
+ sr.getMonthOfYear(),
+ sr.getYear(),
+ sr.getHourOfDay(),
+ sr.getMinutesOfHour(),
+ sr.getTimezone(),
+ thisDuration);
+
+ if (thisMatchTime == null)
+ {
+ if (Logging.jobs.isDebugEnabled())
+ Logging.jobs.debug(" No time match found within interval "+new Long(trialStartInterval).toString()+
+ " to "+new Long(currentTime).toString());
+ continue;
+ }
- if (thisMatchTime == null)
- {
if (Logging.jobs.isDebugEnabled())
- Logging.jobs.debug(" No time match found within interval "+new Long(trialStartInterval).toString()+
+ Logging.jobs.debug(" Time match FOUND within interval "+new Long(trialStartInterval).toString()+
" to "+new Long(currentTime).toString());
- continue;
- }
- if (Logging.jobs.isDebugEnabled())
- Logging.jobs.debug(" Time match FOUND within interval "+new Long(trialStartInterval).toString()+
- " to "+new Long(currentTime).toString());
+ if (matchTime == null || thisDuration == null ||
+ (duration != null && thisMatchTime.longValue() + thisDuration.longValue() >
+ matchTime.longValue() + duration.longValue()))
+ {
+ matchTime = thisMatchTime;
+ duration = thisDuration;
+ }
+ }
- if (matchTime == null || thisDuration == null ||
- (duration != null && thisMatchTime.longValue() + thisDuration.longValue() >
- matchTime.longValue() + duration.longValue()))
+ if (matchTime == null)
{
- matchTime = thisMatchTime;
- duration = thisDuration;
+ jobs.updateLastTime(jobID,currentTime);
+ continue;
}
- }
-
- if (matchTime == null)
- {
- jobs.updateLastTime(jobID,currentTime);
- continue;
- }
-
- int status = jobs.stringToStatus(row.getValue(jobs.statusField).toString());
+ int status = jobs.stringToStatus(row.getValue(jobs.statusField).toString());
- // Calculate the end of the window
- Long windowEnd = null;
- if (duration != null)
- {
- windowEnd = new Long(matchTime.longValue()+duration.longValue());
- }
- if (Logging.jobs.isDebugEnabled())
- {
- Logging.jobs.debug("Job '"+jobID+"' is within run window at "+new Long(currentTime).toString()+" ms. (which starts at "+
- matchTime.toString()+" ms."+((duration==null)?"":(" and goes for "+duration.toString()+" ms."))+")");
- }
-
- int newJobState;
- switch (status)
- {
- case Jobs.STATUS_INACTIVE:
- // If job was formerly "inactive", do the full startup.
- // Start this job! but with no end time.
- // This does not get logged because the startup thread does the logging.
- jobs.startJob(jobID,windowEnd);
- jobQueue.clearFailTimes(jobID);
- if (Logging.jobs.isDebugEnabled())
+ // Calculate the end of the window
+ Long windowEnd = null;
+ if (duration != null)
{
- Logging.jobs.debug("Signalled for job start for job "+jobID);
+ windowEnd = new Long(matchTime.longValue()+duration.longValue());
}
- break;
- case Jobs.STATUS_ACTIVEWAIT:
- unwaitList.add(jobID);
- jobs.unwaitJob(jobID,Jobs.STATUS_RESUMING,windowEnd);
- jobQueue.clearFailTimes(jobID);
- if (Logging.jobs.isDebugEnabled())
- {
- Logging.jobs.debug("Un-waited job "+jobID);
- }
- break;
- case Jobs.STATUS_ACTIVEWAITSEEDING:
- unwaitList.add(jobID);
- jobs.unwaitJob(jobID,Jobs.STATUS_RESUMINGSEEDING,windowEnd);
- jobQueue.clearFailTimes(jobID);
- if (Logging.jobs.isDebugEnabled())
- {
- Logging.jobs.debug("Un-waited job "+jobID);
- }
- break;
- case Jobs.STATUS_PAUSEDWAIT:
- unwaitList.add(jobID);
- jobs.unwaitJob(jobID,jobs.STATUS_PAUSED,windowEnd);
- if (Logging.jobs.isDebugEnabled())
- {
- Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
- }
- break;
- case Jobs.STATUS_PAUSEDWAITSEEDING:
- unwaitList.add(jobID);
- jobs.unwaitJob(jobID,jobs.STATUS_PAUSEDSEEDING,windowEnd);
- if (Logging.jobs.isDebugEnabled())
- {
- Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
- }
- break;
- case Jobs.STATUS_PAUSINGWAITING:
- unwaitList.add(jobID);
- jobs.unwaitJob(jobID,jobs.STATUS_PAUSING,windowEnd);
+
if (Logging.jobs.isDebugEnabled())
{
- Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+ Logging.jobs.debug("Job '"+jobID+"' is within run window at "+new Long(currentTime).toString()+" ms. (which starts at "+
+ matchTime.toString()+" ms."+((duration==null)?"":(" and goes for "+duration.toString()+" ms."))+")");
}
- break;
- case Jobs.STATUS_PAUSINGWAITINGSEEDING:
- unwaitList.add(jobID);
- jobs.unwaitJob(jobID,jobs.STATUS_PAUSINGSEEDING,windowEnd);
- if (Logging.jobs.isDebugEnabled())
+
+ int newJobState;
+ switch (status)
{
- Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+ case Jobs.STATUS_INACTIVE:
+ // If job was formerly "inactive", do the full startup.
+ // Start this job! but with no end time.
+ // This does not get logged because the startup thread does the logging.
+ jobs.startJob(jobID,windowEnd);
+ jobQueue.clearFailTimes(jobID);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Signalled for job start for job "+jobID);
+ }
+ break;
+ case Jobs.STATUS_ACTIVEWAIT:
+ unwaitList.add(jobID);
+ jobs.unwaitJob(jobID,Jobs.STATUS_RESUMING,windowEnd);
+ jobQueue.clearFailTimes(jobID);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Un-waited job "+jobID);
+ }
+ break;
+ case Jobs.STATUS_ACTIVEWAITSEEDING:
+ unwaitList.add(jobID);
+ jobs.unwaitJob(jobID,Jobs.STATUS_RESUMINGSEEDING,windowEnd);
+ jobQueue.clearFailTimes(jobID);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Un-waited job "+jobID);
+ }
+ break;
+ case Jobs.STATUS_PAUSEDWAIT:
+ unwaitList.add(jobID);
+ jobs.unwaitJob(jobID,jobs.STATUS_PAUSED,windowEnd);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+ }
+ break;
+ case Jobs.STATUS_PAUSEDWAITSEEDING:
+ unwaitList.add(jobID);
+ jobs.unwaitJob(jobID,jobs.STATUS_PAUSEDSEEDING,windowEnd);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+ }
+ break;
+ case Jobs.STATUS_PAUSINGWAITING:
+ unwaitList.add(jobID);
+ jobs.unwaitJob(jobID,jobs.STATUS_PAUSING,windowEnd);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+ }
+ break;
+ case Jobs.STATUS_PAUSINGWAITINGSEEDING:
+ unwaitList.add(jobID);
+ jobs.unwaitJob(jobID,jobs.STATUS_PAUSINGSEEDING,windowEnd);
+ if (Logging.jobs.isDebugEnabled())
+ {
+ Logging.jobs.debug("Un-waited (but still paused) job "+jobID);
+ }
+ break;
+ default:
+ break;
}
- break;
- default:
- break;
- }
+ }
+ database.performCommit();
+ return;
+ }
+ catch (ManifoldCFException e)
+ {
+ database.signalRollback();
+ if (e.getErrorCode() == e.DATABASE_TRANSACTION_ABORT)
+ {
+ if (Logging.perf.isDebugEnabled())
+ Logging.perf.debug("Aborted transaction resetting for restart: "+e.getMessage());
+ sleepAmt = getRandomAmount();
+ continue;
+ }
+ throw e;
+ }
+ catch (Error e)
+ {
+ database.signalRollback();
+ throw e;
+ }
+ finally
+ {
+ database.endTransaction();
+ sleepFor(sleepAmt);
}
- }
- catch (ManifoldCFException e)
- {
- database.signalRollback();
- throw e;
- }
- catch (Error e)
- {
- database.signalRollback();
- throw e;
- }
- finally
- {
- database.endTransaction();
}
}