You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by bo...@apache.org on 2008/11/15 14:52:09 UTC
svn commit: r714258 -
/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Author: boisvert
Date: Sat Nov 15 05:52:08 2008
New Revision: 714258
URL: http://svn.apache.org/viewvc?rev=714258&view=rev
Log:
ODE-424: SimpleScheduler creates duplicate jobs when job execution fails
Modified:
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=714258&r1=714257&r2=714258&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Sat Nov 15 05:52:08 2008
@@ -69,7 +69,7 @@
long _immediateInterval = 30000;
/**
- * Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
+ * Jobs scheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
* node, but will not be placed on the todo queue (the promoter will pick them up).
*/
long _nearFutureInterval = 10 * 60 * 1000;
@@ -77,6 +77,13 @@
/** 10s of no communication and you are deemed dead. */
long _staleInterval = 10000;
+ /**
+ * Estimated sustained transaction per second capacity of the system.
+ * e.g. 100 means the system can process 100 jobs per seconds, on average
+ * This number is used to determine how many jobs to load from the database at once.
+ */
+ int _tps = 100;
+
TransactionManager _txm;
ExecutorService _exec;
@@ -132,6 +139,10 @@
_nearFutureInterval = nearFutureInterval;
}
+ public void setTransactionPerSecond(int tps) {
+ _tps = tps;
+ }
+
public void setTransactionManager(TransactionManager txm) {
_txm = txm;
}
@@ -227,25 +238,20 @@
try {
if (immediate) {
- // If we have too many jobs in the queue, we don't allow any new ones
- if (_todo.size() > _todoLimit) {
- __log.error("The execution queue is backed up, the engine can't keep up with the load. Either " +
- "increase the queue size or regulate the flow.");
- return null;
- }
-
// Immediate scheduling means we put it in the DB for safe keeping
_db.insertJob(job, _nodeId, true);
+
// And add it to our todo list .
- addTodoOnCommit(job);
-
+ if (_todo.size() < _todoLimit) {
+ addTodoOnCommit(job);
+ }
__log.debug("scheduled immediate job: " + job.jobId);
} else if (nearfuture) {
// Near future, assign the job to ourselves (why? -- this makes it very unlikely that we
// would get two nodes trying to process the same instance, which causes unsightly rollbacks).
_db.insertJob(job, _nodeId, false);
__log.debug("scheduled near-future job: " + job.jobId);
- } else /* far future */{
+ } else /* far future */ {
// Not the near future, we don't assign a node-id, we'll assign it later.
_db.insertJob(job, null, false);
__log.debug("scheduled far-future job: " + job.jobId);
@@ -303,18 +309,19 @@
throw new ContextException("Error retrieving node list.", ex);
}
+ long now = System.currentTimeMillis();
+
// Pretend we got a heartbeat...
- for (String s : _knownNodes)
- _lastHeartBeat.put(s, System.currentTimeMillis());
+ for (String s : _knownNodes) _lastHeartBeat.put(s, now);
// schedule immediate job loading for now!
- _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));
+ _todo.enqueue(new LoadImmediateTask(now));
// schedule check for stale nodes, make it random so that the nodes don't overlap.
- _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long) (_random.nextDouble() * _staleInterval)));
+ _todo.enqueue(new CheckStaleNodes(now + (long) (_random.nextDouble() * _staleInterval + (_staleInterval/2))));
// do the upgrade sometime (random) in the immediate interval.
- _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long) (_random.nextDouble() * _immediateInterval)));
+ _todo.enqueue(new UpgradeJobsTask(now + (long) (_random.nextDouble() * _immediateInterval + (_immediateInterval/2))));
_todo.start();
_running = true;
@@ -347,10 +354,25 @@
try {
execTransaction(new Callable<Void>() {
public Void call() throws Exception {
- _jobProcessor.onScheduledJob(jobInfo);
if (job.persisted)
if (!_db.deleteJob(job.jobId, _nodeId))
throw new JobNoLongerInDbException(job.jobId,_nodeId);
+
+ try {
+ _jobProcessor.onScheduledJob(jobInfo);
+ } catch (JobProcessorException jpe) {
+ if (jpe.retry) {
+ int retry = job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
+ if (retry <= 10) {
+ long delay = doRetry(job);
+ __log.error("Error while processing transaction, retrying in " + delay + "s");
+ } else {
+ __log.error("Error while processing transaction after 10 retries, no more retries:"+job);
+ }
+ } else {
+ __log.error("Error while processing transaction, no retry.", jpe);
+ }
+ }
return null;
}
});
@@ -358,11 +380,6 @@
// This may happen if two node try to do the same job... we try to avoid
// it the synchronization is a best-effort but not perfect.
__log.debug("job no longer in db forced rollback.");
- } catch (JobProcessorException jpe) {
- if (jpe.retry)
- __log.error("Error while processing transaction, retrying in " + doRetry(job) + "s");
- else
- __log.error("Error while processing transaction, no retry.", jpe);
} catch (Exception ex) {
__log.error("Error while executing transaction", ex);
}
@@ -418,21 +435,25 @@
boolean doLoadImmediate() {
__log.debug("LOAD IMMEDIATE started");
+
+ // don't load anything if we're already half-full; we've got plenty to do already
+ if (_todo.size() > _todoLimit/2) return true;
+
List<Job> jobs;
try {
- do {
- jobs = execTransaction(new Callable<List<Job>>() {
- public List<Job> call() throws Exception {
- return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, 10);
- }
- });
- for (Job j : jobs) {
- if (__log.isDebugEnabled())
- __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
+ final int batch = (int) (_immediateInterval * _tps / 1000);
+ jobs = execTransaction(new Callable<List<Job>>() {
+ public List<Job> call() throws Exception {
+ return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, batch);
+ }
+ });
+ for (Job j : jobs) {
+ if (__log.isDebugEnabled())
+ __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
+ if (_todo.size() < _todoLimit)
_todo.enqueue(j);
- }
- } while (jobs.size() == 10);
+ }
return true;
} catch (Exception ex) {
__log.error("Error loading immediate jobs from database.", ex);
@@ -540,9 +561,9 @@
success = doLoadImmediate();
} finally {
if (success)
- _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .75)));
+ _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .90)));
else
- _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 100));
+ _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 1000));
}
}
@@ -576,7 +597,7 @@
try {
success = doUpgrade();
} finally {
- long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100);
+ long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000);
_nextUpgrade.set(future);
_todo.enqueue(new UpgradeJobsTask(future));
__log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms");
@@ -599,8 +620,11 @@
__log.debug("CHECK STALE NODES started");
for (String nodeId : _knownNodes) {
Long lastSeen = _lastHeartBeat.get(nodeId);
- if (lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
+ if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
+ && !_nodeId.equals(nodeId))
+ {
recoverStaleNode(nodeId);
+ }
}
}