You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by bo...@apache.org on 2009/09/24 00:09:51 UTC
svn commit: r818279 -
/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Author: boisvert
Date: Wed Sep 23 22:09:50 2009
New Revision: 818279
URL: http://svn.apache.org/viewvc?rev=818279&view=rev
Log:
-Fix issue with retry logic and rollback
-Fix issue where a job could be dispatched twice (concurrently) under load
-Improve stability of scheduler under load by preventing overload
(with contributions from Alexis Midon and Matthieu Riou)
Modified:
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=818279&r1=818278&r2=818279&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Wed Sep 23 22:09:50 2009
@@ -38,24 +38,25 @@
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.log4j.helpers.AbsoluteTimeDateFormat;
/**
- * A reliable and relatively simple scheduler that uses a database to persist information about
+ * A reliable and relatively simple scheduler that uses a database to persist information about
* scheduled tasks.
- *
+ *
* The challenge is to achieve high performance in a small memory footprint without loss of reliability
* while supporting distributed/clustered configurations.
- *
- * The design is based around three time horizons: "immediate", "near future", and "everything else".
+ *
+ * The design is based around three time horizons: "immediate", "near future", and "everything else".
* Immediate jobs (i.e. jobs that are about to be up) are written to the database and kept in
* an in-memory priority queue. When they execute, they are removed from the database. Near future
* jobs are placed in the database and assigned to the current node, however they are not stored in
* memory. Periodically jobs are "upgraded" from near-future to immediate status, at which point they
- * get loaded into memory. Jobs that are further out in time, are placed in the database without a
+ * get loaded into memory. Jobs that are further out in time, are placed in the database without a
* node identifer; when they are ready to be "upgraded" to near-future jobs they are assigned to one
- * of the known live nodes. Recovery is rather straighforward, with stale node identifiers being
- * reassigned to known good nodes.
- *
+ * of the known live nodes. Recovery is rather straighforward, with stale node identifiers being
+ * reassigned to known good nodes.
+ *
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*
*/
@@ -77,6 +78,9 @@
/** 10s of no communication and you are deemed dead. */
long _staleInterval = 10000;
+ /** Duration used to log a warning if a job scheduled at a date D is queued at D'>D+_warningDelay */
+ long _warningDelay = 5*60*1000;
+
/**
* Estimated sustained transaction per second capacity of the system.
* e.g. 100 means the system can process 100 jobs per seconds, on average
@@ -97,7 +101,7 @@
volatile JobProcessor _jobProcessor;
volatile JobProcessor _polledRunnableProcessor;
-
+
private SchedulerThread _todo;
private DatabaseDelegate _db;
@@ -121,7 +125,7 @@
private Random _random = new Random();
private long _pollIntervalForPolledRunnable = Long.getLong("org.apache.ode.polledRunnable.pollInterval", 10 * 60 * 1000);
-
+
public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
_nodeId = nodeId;
_db = del;
@@ -130,9 +134,10 @@
_nearFutureInterval = getLongProperty(conf, "ode.scheduler.nearFutureInterval", _nearFutureInterval);
_staleInterval = getLongProperty(conf, "ode.scheduler.staleInterval", _staleInterval);
_tps = getIntProperty(conf, "ode.scheduler.transactionsPerSecond", _tps);
+ _warningDelay = getLongProperty(conf, "ode.scheduler.warningDelay", _warningDelay);
_todo = new SchedulerThread(this);
}
-
+
public void setPollIntervalForPolledRunnable(long pollIntervalForPolledRunnable) {
_pollIntervalForPolledRunnable = pollIntervalForPolledRunnable;
}
@@ -148,7 +153,7 @@
if (s != null) return Long.parseLong(s);
else return defaultValue;
}
-
+
public void setNodeId(String nodeId) {
_nodeId = nodeId;
}
@@ -218,7 +223,7 @@
String errmsg = "Internal Error, could not begin transaction.";
throw new ContextException(errmsg, ex);
}
-
+
boolean success = false;
try {
T retval = transaction.call();
@@ -278,7 +283,7 @@
Map<String, Object> jobDetails = new HashMap<String, Object>();
jobDetails.put("runnable", runnable);
runnable.storeToDetailsMap(jobDetails);
-
+
if (__log.isDebugEnabled())
__log.debug("scheduling " + jobDetails + " for " + when);
@@ -292,9 +297,9 @@
if (immediate) {
// Immediate scheduling means we put it in the DB for safe keeping
_db.insertJob(job, _nodeId, true);
-
+
// And add it to our todo list .
- if (_todo.size() < _todoLimit) {
+ if (_outstandingJobs.size() < _todoLimit) {
addTodoOnCommit(job);
}
__log.debug("scheduled immediate job: " + job.jobId);
@@ -362,7 +367,7 @@
}
long now = System.currentTimeMillis();
-
+
// Pretend we got a heartbeat...
for (String s : _knownNodes) _lastHeartBeat.put(s, now);
@@ -378,11 +383,11 @@
_todo.start();
_running = true;
}
-
+
private long randomMean(long mean) {
return (long) _random.nextDouble() * mean + (mean/2);
}
-
+
public synchronized void stop() {
if (!_running)
return;
@@ -393,6 +398,10 @@
_todo.clearTasks(CheckStaleNodes.class);
_outstandingJobs.clear();
+ // disable because this is not the right way to do it
+ // will be fixed by ODE-595
+ // graceful shutdown; any new submits will throw RejectedExecutionExceptions
+// _exec.shutdown();
_running = false;
}
@@ -409,24 +418,18 @@
public Void call() throws Exception {
try {
if (job.transacted) {
+ final Object[] needRetry = new Object[] { false };
try {
execTransaction(new Callable<Void>() {
public Void call() throws Exception {
if (job.persisted)
if (!_db.deleteJob(job.jobId, _nodeId))
throw new JobNoLongerInDbException(job.jobId,_nodeId);
-
try {
_jobProcessor.onScheduledJob(jobInfo);
} catch (JobProcessorException jpe) {
if (jpe.retry) {
- int retry = job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
- if (retry <= 10) {
- long delay = doRetry(job);
- __log.error("Error while processing transaction, retrying in " + delay + "s");
- } else {
- __log.error("Error while processing transaction after 10 retries, no more retries:"+job);
- }
+ needRetry[0] = true;
} else {
__log.error("Error while processing transaction, no retry.", jpe);
}
@@ -440,15 +443,36 @@
// This may happen if two node try to do the same job... we try to avoid
// it the synchronization is a best-effort but not perfect.
__log.debug("job no longer in db forced rollback.");
- } catch (Exception ex) {
- __log.error("Error while executing transaction", ex);
+ } catch (final Exception ex) {
+ // We only get here if the above execTransaction fails, so that transaction got
+ // rollbacked already
+ execTransaction(new Callable<Void>() {
+ public Void call() throws Exception {
+ if ((Boolean)needRetry[0]) {
+ int retry = job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
+ if (retry <= 10) {
+ long delay = doRetry(job);
+ __log.error("Error while processing transaction, retrying in " + delay + "s");
+ } else {
+ __log.error("Error while processing transaction after 10 retries, no more retries:"+job);
+ }
+ }
+
+ // We got rollbacked, as we schedule a retry we want to be sure the original kob is
+ // really deleted
+ if (job.persisted) _db.deleteJob(job.jobId, _nodeId);
+
+ __log.error("Error while executing transaction", ex);
+ return null;
+ }
+ });
}
} else {
_jobProcessor.onScheduledJob(jobInfo);
}
return null;
} finally {
- _outstandingJobs.remove(job.jobId);
+ _outstandingJobs.remove(job.jobId);
}
}
});
@@ -469,7 +493,7 @@
* <li>5. System powered off and restarts; the poller job does not know what the status
* of the runnable. This is handled just like the case #1.</li>
* </ul>
- *
+ *
* There is at least one re-scheduling of the poller job. Since, the runnable's state is
* not persisted, and the same runnable may be tried again after system failure,
* the runnable that's used with this polling should be repeatable.
@@ -487,7 +511,7 @@
public Void call() throws Exception {
if (!_db.deleteJob(job.jobId, _nodeId))
throw new JobNoLongerInDbException(job.jobId,_nodeId);
-
+
try {
_polledRunnableProcessor.onScheduledJob(jobInfo);
if( !"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.get("runnable_status"))) ) {
@@ -530,7 +554,7 @@
}
});
}
-
+
private void addTodoOnCommit(final Job job) {
registerSynchronizer(new Synchronizer() {
public void afterCompletion(boolean success) {
@@ -578,27 +602,48 @@
boolean doLoadImmediate() {
__log.debug("LOAD IMMEDIATE started");
-
+
// don't load anything if we're already half-full; we've got plenty to do already
- if (_todo.size() > _todoLimit/2) return true;
-
+ if (_outstandingJobs.size() > _todoLimit/2) return true;
+
List<Job> jobs;
try {
- final int batch = (int) (_immediateInterval * _tps / 1000);
+ // don't load more than we can chew
+ final int batch = Math.min((int) (_immediateInterval * _tps / 1000), _todoLimit-_outstandingJobs.size());
+
+ // jobs might have been enqueued by #addTodoOnCommit meanwhile
+ if (batch<=0) {
+ if (__log.isDebugEnabled()) __log.debug("Max capacity reached: "+_outstandingJobs.size()+" jobs dispacthed i.e. queued or being executed");
+ return true;
+ }
+
+ if (__log.isDebugEnabled()) __log.debug("loading "+batch+" jobs from db");
jobs = execTransaction(new Callable<List<Job>>() {
public List<Job> call() throws Exception {
return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, batch);
}
});
+ long delayedTime = System.currentTimeMillis() - _warningDelay;
+ int delayedCount = 0;
+ boolean runningLate;
+ AbsoluteTimeDateFormat f = new AbsoluteTimeDateFormat();
for (Job j : jobs) {
- if (__log.isDebugEnabled())
- __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
-
- if (_todo.size() >= _todoLimit)
+ // jobs might have been enqueued by #addTodoOnCommit meanwhile
+ if (_outstandingJobs.size() >= _todoLimit){
+ if (__log.isDebugEnabled()) __log.debug("Max capacity reached: "+_outstandingJobs.size()+" jobs dispacthed i.e. queued or being executed");
break;
-
+ }
+ runningLate = j.schedDate <= delayedTime;
+ if (runningLate) {
+ delayedCount++;
+ }
+ if (__log.isDebugEnabled())
+ __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate + "(" + f.format(j.schedDate)+") "+(runningLate?" delayed=true":""));
enqueue(j);
}
+ if (delayedCount > 0) {
+ __log.warn("Dispatching jobs with more than "+(_warningDelay/60000)+" minutes delay. Either the server was down for some time or the job load is greater than available capacity");
+ }
return true;
} catch (Exception ex) {
__log.error("Error loading immediate jobs from database.", ex);
@@ -609,16 +654,13 @@
}
void enqueue(Job job) {
- Long outstanding = _outstandingJobs.get(job.jobId);
- if (outstanding != null && System.currentTimeMillis()-outstanding.longValue() > 60*60*1000) {
- __log.error("Stale outstanding job: "+job.jobId);
- outstanding = null;
- }
+ Long outstanding = _outstandingJobs.putIfAbsent(job.jobId, job.schedDate);
if (outstanding == null) {
- _outstandingJobs.put(job.jobId, System.currentTimeMillis());
- _todo.enqueue(job);
- } else {
- __log.info("Outstanding job: "+job.jobId);
+ if (job.schedDate <= System.currentTimeMillis()) {
+ runJob(job);
+ } else {
+ _todo.enqueue(job);
+ }
}
}