You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by bo...@apache.org on 2008/11/15 17:04:47 UTC
svn commit: r717872 - in /ode/trunk/scheduler-simple/src:
main/java/org/apache/ode/scheduler/simple/
test/java/org/apache/ode/scheduler/simple/
Author: boisvert
Date: Sat Nov 15 08:04:47 2008
New Revision: 717872
URL: http://svn.apache.org/viewvc?rev=717872&view=rev
Log:
Merge from Ode 1.x branch:
ODE-425: SimpleScheduler recovery is O(n) with respect to outstanding jobs
ODE-424: SimpleScheduler creates duplicate jobs when job execution fails
Modified:
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java Sat Nov 15 08:04:47 2008
@@ -51,10 +51,10 @@
private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid = ?, scheduled = 0 where nodeid = ?";
- private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0 "
+ private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null "
+ "and mod(ts,?) = ? and ts < ?";
- private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0 "
+ private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set nodeid = ? where nodeid is null "
+ "and (ts % ?) = ? and ts < ?";
private static final String SAVE_JOB = "insert into ODE_JOB "
@@ -63,7 +63,7 @@
private static final String GET_NODEIDS = "select distinct nodeid from ODE_JOB";
private static final String SCHEDULE_IMMEDIATE = "select jobid, ts, transacted, scheduled, details from ODE_JOB "
- + "where nodeid = ? and scheduled = 0 and ts < ? order by ts";
+ + "where nodeid = ? and ts < ? order by ts";
private static final String UPDATE_SCHEDULED = "update ODE_JOB set scheduled = 1 where jobid in (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
@@ -179,24 +179,6 @@
}
rs.close();
ps.close();
-
- // mark jobs as scheduled, UPDATE_SCHEDULED_SLOTS at a time
- int j = 0;
- int updateCount = 0;
- ps = con.prepareStatement(UPDATE_SCHEDULED);
- for (int updates = 1; updates <= (ret.size() / UPDATE_SCHEDULED_SLOTS) + 1; updates++) {
- for (int i = 1; i <= UPDATE_SCHEDULED_SLOTS; i++) {
- ps.setString(i, j < ret.size() ? ret.get(j).jobId : "");
- j++;
- }
- ps.execute();
- updateCount += ps.getUpdateCount();
- }
- if (updateCount != ret.size()) {
- throw new DatabaseException(
- "Updating scheduled jobs failed to update all jobs; expected=" + ret.size()
- + " actual=" + updateCount);
- }
} catch (SQLException se) {
throw new DatabaseException(se);
} finally {
@@ -304,7 +286,7 @@
d = Dialect.SQLSERVER;
} else if (dbProductName.indexOf("MySQL") >= 0) {
d = Dialect.MYSQL;
- } else if (dbProductName.indexOf("Sybase") >= 0) {
+ } else if (dbProductName.indexOf("Sybase") >= 0 || dbProductName.indexOf("Adaptive") >= 0) {
d = Dialect.SYBASE;
}
}
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java Sat Nov 15 08:04:47 2008
@@ -25,7 +25,7 @@
/**
* Like a task, but a little bit better.
- *
+ *
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*/
class Job extends Task {
@@ -37,7 +37,7 @@
public Job(long when, boolean transacted, Map<String, Object> jobDetail) {
this(when, new GUID().toString(),transacted,jobDetail);
}
-
+
public Job(long when, String jobId, boolean transacted,Map<String, Object> jobDetail) {
super(when);
this.jobId = jobId;
@@ -54,5 +54,9 @@
public boolean equals(Object obj) {
return obj instanceof Job && jobId.equals(((Job) obj).jobId);
}
-
+
+ @Override
+ public String toString() {
+ return "Job "+jobId+" transacted: "+transacted+" persisted: "+persisted+" details: "+detail;
+ }
}
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java Sat Nov 15 08:04:47 2008
@@ -31,168 +31,167 @@
/**
* Implements the "todo" queue and prioritized scheduling mechanism.
- *
+ *
* @author mszefler
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- *
+ *
*/
class SchedulerThread implements Runnable {
- private static final Log __log = LogFactory.getLog(SchedulerThread.class);
+ private static final Log __log = LogFactory.getLog(SchedulerThread.class);
- private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
+ private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
- /** Jobs ready for immediate execution. */
- private PriorityBlockingQueue<Task> _todo;
+ /** Jobs ready for immediate execution. */
+ private PriorityBlockingQueue<Task> _todo;
- /** Lock for managing the queue */
- private ReentrantLock _lock = new ReentrantLock();
-
- private Condition _activity = _lock.newCondition();
-
- private volatile boolean _done;
-
- private TaskRunner _taskrunner;
-
- private Thread _thread;
-
- SchedulerThread(TaskRunner runner) {
- _todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
- new JobComparatorByDate());
- _taskrunner = runner;
- }
-
- void start() {
- if (_thread != null)
- return;
-
- _done = false;
- _thread = new Thread(this, "OdeScheduler");
- _thread.start();
- }
-
- /**
- * Shutdown the thread.
- */
- void stop() {
- if (_thread == null)
- return;
-
- _done = true;
- _lock.lock();
- try {
- _activity.signal();
- } finally {
- _lock.unlock();
-
- }
-
- while (_thread != null)
- try {
- _thread.join();
- _thread = null;
- } catch (InterruptedException e) {
- ;
- }
-
- }
-
- /**
- * Add a job to the todo queue.
- *
- * @param job
- */
- void enqueue(Task task) {
- _lock.lock();
- try {
- _todo.add(task);
- _activity.signal();
- } finally {
- _lock.unlock();
- }
- }
-
- /**
- * Remove a job to the todo queue.
- *
- * @param job
- */
- void dequeue(Task task) {
- _lock.lock();
- try {
- _todo.remove(task);
- _activity.signal();
- } finally {
- _lock.unlock();
- }
- }
-
-
- /**
- * Get the size of the todo queue.
- *
- * @return
- */
- public int size() {
- return _todo.size();
- }
-
- /**
- * Pop items off the todo queue, and send them to the task runner for processing.
- */
- public void run() {
- while (!_done) {
- _lock.lock();
- try {
- long nextjob;
- while ((nextjob = nextJobTime()) > 0 && !_done)
- _activity.await(nextjob, TimeUnit.MILLISECONDS);
-
- if (!_done && nextjob == 0) {
- Task task = _todo.take();
- _taskrunner.runTask(task);
-
- }
- } catch (InterruptedException ex) {
- ; // ignore
- } finally {
- _lock.unlock();
- }
- }
- }
-
- /**
- * Calculate the time until the next available job.
- *
- * @return time until next job, 0 if one is one is scheduled to go, and some
- * really large number if there are no jobs to speak of
- */
- private long nextJobTime() {
- assert _lock.isLocked();
-
- Task job = _todo.peek();
- if (job == null)
- return Long.MAX_VALUE;
-
- return Math.max(0, job.schedDate - System.currentTimeMillis());
- }
-
- /**
- * Remove the tasks of a given type from the list.
- * @param tasktype type of task
- */
- public void clearTasks(final Class<? extends Task> tasktype) {
- _lock.lock();
- try {
- CollectionsX.remove_if(_todo, new MemberOfFunction<Task>() {
- @Override
- public boolean isMember(Task o) {
- return tasktype.isAssignableFrom(o.getClass());
- }
-
- });
- } finally {
- _lock.unlock();
- }
- }
+ /** Lock for managing the queue */
+ private ReentrantLock _lock = new ReentrantLock();
+
+ private Condition _activity = _lock.newCondition();
+
+ private volatile boolean _done;
+
+ private TaskRunner _taskrunner;
+
+ private Thread _thread;
+
+ SchedulerThread(TaskRunner runner) {
+ _todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
+ new JobComparatorByDate());
+ _taskrunner = runner;
+ }
+
+ void start() {
+ if (_thread != null)
+ return;
+
+ _done = false;
+ _thread = new Thread(this, "OdeScheduler");
+ _thread.start();
+ }
+
+ /**
+ * Shutdown the thread.
+ */
+ void stop() {
+ if (_thread == null)
+ return;
+
+ _done = true;
+ _lock.lock();
+ try {
+ _activity.signal();
+ } finally {
+ _lock.unlock();
+
+ }
+
+ while (_thread != null)
+ try {
+ _thread.join();
+ _thread = null;
+ } catch (InterruptedException e) {
+ ;
+ }
+
+ }
+
+ /**
+ * Add a job to the todo queue.
+ *
+ * @param job
+ */
+ void enqueue(Task task) {
+ _lock.lock();
+ try {
+ _todo.add(task);
+ _activity.signal();
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Remove a job to the todo queue.
+ *
+ * @param job
+ */
+ void dequeue(Task task) {
+ _lock.lock();
+ try {
+ _todo.remove(task);
+ _activity.signal();
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Get the size of the todo queue.
+ *
+ * @return
+ */
+ public int size() {
+ return _todo.size();
+ }
+
+ /**
+ * Pop items off the todo queue, and send them to the task runner for processing.
+ */
+ public void run() {
+ while (!_done) {
+ _lock.lock();
+ try {
+ long nextjob;
+ while ((nextjob = nextJobTime()) > 0 && !_done)
+ _activity.await(nextjob, TimeUnit.MILLISECONDS);
+
+ if (!_done && nextjob == 0) {
+ Task task = _todo.take();
+ _taskrunner.runTask(task);
+
+ }
+ } catch (InterruptedException ex) {
+ ; // ignore
+ } finally {
+ _lock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Calculate the time until the next available job.
+ *
+ * @return time until next job, 0 if one is one is scheduled to go, and some
+ * really large number if there are no jobs to speak of
+ */
+ private long nextJobTime() {
+ assert _lock.isLocked();
+
+ Task job = _todo.peek();
+ if (job == null)
+ return Long.MAX_VALUE;
+
+ return Math.max(0, job.schedDate - System.currentTimeMillis());
+ }
+
+ /**
+ * Remove the tasks of a given type from the list.
+ * @param tasktype type of task
+ */
+ public void clearTasks(final Class<? extends Task> tasktype) {
+ _lock.lock();
+ try {
+ CollectionsX.remove_if(_todo, new MemberOfFunction<Task>() {
+ @Override
+ public boolean isMember(Task o) {
+ return tasktype.isAssignableFrom(o.getClass());
+ }
+
+ });
+ } finally {
+ _lock.unlock();
+ }
+ }
}
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Sat Nov 15 08:04:47 2008
@@ -23,10 +23,14 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.Status;
import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -36,19 +40,22 @@
import org.apache.ode.bpel.iapi.Scheduler;
/**
- * A reliable and relatively simple scheduler that uses a database to persist information about scheduled tasks.
- *
- * The challange is to achieve high performance in a small memory footprint without loss of reliability while supporting
- * distributed/clustered configurations.
- *
- * The design is based around three time horizons: "immediate", "near future", and "everything else". Immediate jobs (i.e. jobs that
- * are about to be up) are written to the database and kept in an in-memory priority queue. When they execute, they are removed from
- * the database. Near future jobs are placed in the database and assigned to the current node, however they are not stored in
- * memory. Periodically jobs are "upgraded" from near-future to immediate status, at which point they get loaded into memory. Jobs
- * that are further out in time, are placed in the database without a node identifer; when they are ready to be "upgraded" to
- * near-future jobs they are assigned to one of the known live nodes. Recovery is rather straighforward, with stale node identifiers
- * being reassigned to known good nodes.
- *
+ * A reliable and relatively simple scheduler that uses a database to persist information about
+ * scheduled tasks.
+ *
+ * The challenge is to achieve high performance in a small memory footprint without loss of reliability
+ * while supporting distributed/clustered configurations.
+ *
+ * The design is based around three time horizons: "immediate", "near future", and "everything else".
+ * Immediate jobs (i.e. jobs that are about to be up) are written to the database and kept in
+ * an in-memory priority queue. When they execute, they are removed from the database. Near future
+ * jobs are placed in the database and assigned to the current node, however they are not stored in
+ * memory. Periodically jobs are "upgraded" from near-future to immediate status, at which point they
+ * get loaded into memory. Jobs that are further out in time, are placed in the database without a
+ * node identifer; when they are ready to be "upgraded" to near-future jobs they are assigned to one
+ * of the known live nodes. Recovery is rather straighforward, with stale node identifiers being
+ * reassigned to known good nodes.
+ *
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*
*/
@@ -62,7 +69,7 @@
long _immediateInterval = 30000;
/**
- * Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
+ * Jobs scheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
* node, but will not be placed on the todo queue (the promoter will pick them up).
*/
long _nearFutureInterval = 10 * 60 * 1000;
@@ -70,8 +77,17 @@
/** 10s of no communication and you are deemed dead. */
long _staleInterval = 10000;
+ /**
+ * Estimated sustained transaction per second capacity of the system.
+ * e.g. 100 means the system can process 100 jobs per seconds, on average
+ * This number is used to determine how many jobs to load from the database at once.
+ */
+ int _tps = 100;
+
TransactionManager _txm;
+ ExecutorService _exec;
+
String _nodeId;
/** Maximum number of jobs in the "near future" / todo queue. */
@@ -103,10 +119,26 @@
public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
_nodeId = nodeId;
_db = del;
- _todoLimit = Integer.parseInt(conf.getProperty("ode.scheduler.queueLength", "10000"));
+ _todoLimit = getIntProperty(conf, "ode.scheduler.queueLength", _todoLimit);
+ _immediateInterval = getLongProperty(conf, "ode.scheduler.immediateInterval", _immediateInterval);
+ _nearFutureInterval = getLongProperty(conf, "ode.scheduler.nearFutureInterval", _nearFutureInterval);
+ _staleInterval = getLongProperty(conf, "ode.scheduler.staleInterval", _staleInterval);
+ _tps = getIntProperty(conf, "ode.scheduler.transactionsPerSecond", _tps);
_todo = new SchedulerThread(this);
}
+ private int getIntProperty(Properties props, String propName, int defaultValue) {
+ String s = props.getProperty(propName);
+ if (s != null) return Integer.parseInt(s);
+ else return defaultValue;
+ }
+
+ private long getLongProperty(Properties props, String propName, long defaultValue) {
+ String s = props.getProperty(propName);
+ if (s != null) return Long.parseLong(s);
+ else return defaultValue;
+ }
+
public void setNodeId(String nodeId) {
_nodeId = nodeId;
}
@@ -123,6 +155,10 @@
_nearFutureInterval = nearFutureInterval;
}
+ public void setTransactionsPerSecond(int tps) {
+ _tps = tps;
+ }
+
public void setTransactionManager(TransactionManager txm) {
_txm = txm;
}
@@ -131,6 +167,10 @@
_db = dbd;
}
+ public void setExecutorService(ExecutorService executorService) {
+ _exec = executorService;
+ }
+
public void cancelJob(String jobId) throws ContextException {
_todo.dequeue(new Job(0, jobId, false, null));
try {
@@ -141,6 +181,20 @@
}
}
+ public <T> Future<T> execIsolatedTransaction(final Callable<T> transaction) throws Exception, ContextException {
+ return _exec.submit(new Callable<T>() {
+ public T call() throws Exception {
+ try {
+ return execTransaction(transaction);
+ } catch (Exception e) {
+ __log.error("An exception occured while executing an isolated transaction, " +
+ "the transaction is going to be abandoned.", e);
+ return null;
+ }
+ }
+ });
+ }
+
public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
try {
if (__log.isDebugEnabled()) __log.debug("Beginning a new transaction");
@@ -167,6 +221,24 @@
}
}
+ public void registerSynchronizer(final Synchronizer synch) throws ContextException {
+ try {
+ _txm.getTransaction().registerSynchronization(new Synchronization() {
+
+ public void beforeCompletion() {
+ synch.beforeCompletion();
+ }
+
+ public void afterCompletion(int status) {
+ synch.afterCompletion(status == Status.STATUS_COMMITTED);
+ }
+
+ });
+ } catch (Exception e) {
+ throw new ContextException("Unable to register synchronizer.", e);
+ }
+ }
+
public String schedulePersistedJob(final Map<String, Object> jobDetail, Date when) throws ContextException {
long ctime = System.currentTimeMillis();
if (when == null)
@@ -182,25 +254,20 @@
try {
if (immediate) {
- // If we have too many jobs in the queue, we don't allow any new ones
- if (_todo.size() > _todoLimit) {
- __log.error("The execution queue is backed up, the engine can't keep up with the load. Either " +
- "increase the queue size or regulate the flow.");
- return null;
- }
-
// Immediate scheduling means we put it in the DB for safe keeping
_db.insertJob(job, _nodeId, true);
+
// And add it to our todo list .
- addTodoOnCommit(job);
-
+ if (_todo.size() < _todoLimit) {
+ addTodoOnCommit(job);
+ }
__log.debug("scheduled immediate job: " + job.jobId);
} else if (nearfuture) {
// Near future, assign the job to ourselves (why? -- this makes it very unlikely that we
// would get two nodes trying to process the same instance, which causes unsightly rollbacks).
_db.insertJob(job, _nodeId, false);
__log.debug("scheduled near-future job: " + job.jobId);
- } else /* far future */{
+ } else /* far future */ {
// Not the near future, we don't assign a node-id, we'll assign it later.
_db.insertJob(job, null, false);
__log.debug("scheduled far-future job: " + job.jobId);
@@ -235,6 +302,9 @@
if (_running)
return;
+ if (_exec == null)
+ _exec = Executors.newCachedThreadPool();
+
_todo.clearTasks(UpgradeJobsTask.class);
_todo.clearTasks(LoadImmediateTask.class);
_todo.clearTasks(CheckStaleNodes.class);
@@ -255,23 +325,28 @@
throw new ContextException("Error retrieving node list.", ex);
}
+ long now = System.currentTimeMillis();
+
// Pretend we got a heartbeat...
- for (String s : _knownNodes)
- _lastHeartBeat.put(s, System.currentTimeMillis());
+ for (String s : _knownNodes) _lastHeartBeat.put(s, now);
// schedule immediate job loading for now!
- _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));
+ _todo.enqueue(new LoadImmediateTask(now));
// schedule check for stale nodes, make it random so that the nodes don't overlap.
- _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long) (_random.nextDouble() * _staleInterval)));
+ _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
// do the upgrade sometime (random) in the immediate interval.
- _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long) (_random.nextDouble() * _immediateInterval)));
+ _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
_todo.start();
_running = true;
}
-
+
+ private long randomMean(long mean) {
+ return (long) _random.nextDouble() * mean + (mean/2);
+ }
+
public synchronized void stop() {
if (!_running)
return;
@@ -283,28 +358,6 @@
_running = false;
}
- public void jobCompleted(String jobId) {
- boolean deleted = false;
- try {
- deleted = _db.deleteJob(jobId, _nodeId);
- } catch (DatabaseException de) {
- String errmsg = "Database error.";
- __log.error(errmsg, de);
- throw new ContextException(errmsg, de);
- }
-
- if (!deleted) {
- try {
- _txm.getTransaction().setRollbackOnly();
- } catch (Exception ex) {
- __log.error("Transaction manager error; setRollbackOnly() failed.", ex);
- }
-
- throw new ContextException("Job no longer in database: jobId=" + jobId);
- }
- }
-
-
/**
* Run a job in the current thread.
*
@@ -315,53 +368,70 @@
final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail,
(Integer)(job.detail.get("retry") != null ? job.detail.get("retry") : 0));
- try {
- try {
- _jobProcessor.onScheduledJob(jobInfo);
- } catch (JobProcessorException jpe) {
- if (jpe.retry)
- __log.error("Error while processing transaction, retrying in " + doRetry(job) + "s");
- else
- __log.error("Error while processing transaction, no retry.", jpe);
+ _exec.submit(new Callable<Void>() {
+ public Void call() throws Exception {
+ if (job.transacted) {
+ try {
+ execTransaction(new Callable<Void>() {
+ public Void call() throws Exception {
+ if (job.persisted)
+ if (!_db.deleteJob(job.jobId, _nodeId))
+ throw new JobNoLongerInDbException(job.jobId,_nodeId);
+
+ try {
+ _jobProcessor.onScheduledJob(jobInfo);
+ } catch (JobProcessorException jpe) {
+ if (jpe.retry) {
+ int retry = job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
+ if (retry <= 10) {
+ long delay = doRetry(job);
+ __log.error("Error while processing transaction, retrying in " + delay + "s");
+ } else {
+ __log.error("Error while processing transaction after 10 retries, no more retries:"+job);
+ }
+ } else {
+ __log.error("Error while processing transaction, no retry.", jpe);
+ }
+ }
+ return null;
+ }
+ });
+ } catch (JobNoLongerInDbException jde) {
+ // This may happen if two node try to do the same job... we try to avoid
+ // it the synchronization is a best-effort but not perfect.
+ __log.debug("job no longer in db forced rollback.");
+ } catch (Exception ex) {
+ __log.error("Error while executing transaction", ex);
+ }
+ } else {
+ _jobProcessor.onScheduledJob(jobInfo);
+ }
+ return null;
}
- } catch (Exception ex) {
- __log.error("Error in scheduler processor.", ex);
- }
-
+ });
}
private void addTodoOnCommit(final Job job) {
+ registerSynchronizer(new Synchronizer() {
- Transaction tx;
- try {
- tx = _txm.getTransaction();
- } catch (Exception ex) {
- String errmsg = "Transaction manager error; unable to obtain transaction.";
- __log.error(errmsg, ex);
- throw new ContextException(errmsg, ex);
- }
-
- if (tx == null)
- throw new ContextException("Missing required transaction in thread " + Thread.currentThread());
-
- try {
- tx.registerSynchronization(new Synchronization() {
-
- public void afterCompletion(int status) {
- if (status == Status.STATUS_COMMITTED) {
- _todo.enqueue(job);
- }
+ public void afterCompletion(boolean success) {
+ if (success) {
+ _todo.enqueue(job);
}
+ }
- public void beforeCompletion() {
- }
+ public void beforeCompletion() {
+ }
- });
+ });
+ }
- } catch (Exception e) {
- String errmsg = "Unable to registrer synchronizer. ";
- __log.error(errmsg, e);
- throw new ContextException(errmsg, e);
+ public boolean isTransacted() {
+ try {
+ Transaction tx = _txm.getTransaction();
+ return (tx != null && tx.getStatus() != Status.STATUS_NO_TRANSACTION);
+ } catch (SystemException e) {
+ throw new ContextException("Internal Error: Could not obtain transaction status.");
}
}
@@ -385,21 +455,25 @@
boolean doLoadImmediate() {
__log.debug("LOAD IMMEDIATE started");
+
+ // don't load anything if we're already half-full; we've got plenty to do already
+ if (_todo.size() > _todoLimit/2) return true;
+
List<Job> jobs;
try {
- do {
- jobs = execTransaction(new Callable<List<Job>>() {
- public List<Job> call() throws Exception {
- return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, 10);
- }
- });
- for (Job j : jobs) {
- if (__log.isDebugEnabled())
- __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
+ final int batch = (int) (_immediateInterval * _tps / 1000);
+ jobs = execTransaction(new Callable<List<Job>>() {
+ public List<Job> call() throws Exception {
+ return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, batch);
+ }
+ });
+ for (Job j : jobs) {
+ if (__log.isDebugEnabled())
+ __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
+ if (_todo.size() < _todoLimit)
_todo.enqueue(j);
- }
- } while (jobs.size() == 10);
+ }
return true;
} catch (Exception ex) {
__log.error("Error loading immediate jobs from database.", ex);
@@ -447,7 +521,6 @@
/**
* Re-assign stale node's jobs to self.
- *
* @param nodeId
*/
void recoverStaleNode(final String nodeId) {
@@ -476,6 +549,7 @@
} finally {
__log.debug("node recovery complete");
}
+
}
private long doRetry(Job job) throws DatabaseException {
@@ -507,9 +581,9 @@
success = doLoadImmediate();
} finally {
if (success)
- _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .75)));
+ _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .90)));
else
- _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 100));
+ _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 1000));
}
}
@@ -517,7 +591,6 @@
/**
* Upgrade jobs from far future to immediate future (basically, assign them to a node).
- *
* @author mszefler
*
*/
@@ -544,7 +617,7 @@
try {
success = doUpgrade();
} finally {
- long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100);
+ long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000);
_nextUpgrade.set(future);
_todo.enqueue(new UpgradeJobsTask(future));
__log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms");
@@ -567,11 +640,16 @@
__log.debug("CHECK STALE NODES started");
for (String nodeId : _knownNodes) {
Long lastSeen = _lastHeartBeat.get(nodeId);
- if (lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
+ if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
+ && !_nodeId.equals(nodeId))
+ {
recoverStaleNode(nodeId);
+ }
}
}
+
}
+
}
Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java Sat Nov 15 08:04:47 2008
@@ -43,7 +43,7 @@
_ds = new DelegateSupport();
_del = _ds.delegate();
}
-
+
public void testGetNodeIds() throws Exception {
// should have no node ids in the db, empty list (not null)
@@ -91,8 +91,9 @@
assertEquals("j1",jobs.get(0).jobId);
jobs = _del.dequeueImmediate("n1", 250L, 1000);
assertNotNull(jobs);
- assertEquals(1, jobs.size());
- assertEquals("j2",jobs.get(0).jobId);
+ assertEquals(2, jobs.size());
+ assertEquals("j1",jobs.get(0).jobId);
+ assertEquals("j2",jobs.get(1).jobId);
}
public void testScheduleImmediateMaxRows() throws Exception {
@@ -103,10 +104,6 @@
assertNotNull(jobs);
assertEquals(1, jobs.size());
assertEquals("j1",jobs.get(0).jobId);
- jobs = _del.dequeueImmediate("n1", 250L, 1000);
- assertNotNull(jobs);
- assertEquals(1, jobs.size());
- assertEquals("j2",jobs.get(0).jobId);
}
public void testScheduleImmediateNodeFilter() throws Exception {
Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java Sat Nov 15 08:04:47 2008
@@ -51,7 +51,7 @@
public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
_tried++;
- throw new Scheduler.JobProcessorException(jobInfo.retryCount < 2);
+ throw new Scheduler.JobProcessorException(jobInfo.retryCount < 3);
}
Map<String, Object> newDetail(String x) {
Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java Sat Nov 15 08:04:47 2008
@@ -52,7 +52,7 @@
_st.start();
long schedtime = System.currentTimeMillis() + 300;
_st.enqueue(new Task(schedtime));
- Thread.sleep(600);
+ Thread.sleep(1000);
assertEquals(1,_tasks.size());
assertTrue(_tasks.get(0).time < schedtime + SCHED_TOLERANCE / 2);
assertTrue(_tasks.get(0).time > schedtime - SCHED_TOLERANCE / 2);
Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java Sat Nov 15 08:04:47 2008
@@ -21,29 +21,34 @@
import java.util.*;
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import junit.framework.TestCase;
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
+import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
public class SimpleSchedulerTest extends TestCase implements JobProcessor {
DelegateSupport _ds;
SimpleScheduler _scheduler;
ArrayList<JobInfo> _jobs;
+ ArrayList<JobInfo> _commit;
TransactionManager _txm;
-
public void setUp() throws Exception {
_txm = new GeronimoTransactionManager();
_ds = new DelegateSupport();
_scheduler = newScheduler("n1");
_jobs = new ArrayList<JobInfo>(100);
+ _commit = new ArrayList<JobInfo>(100);
}
public void tearDown() throws Exception {
@@ -52,21 +57,25 @@
public void testConcurrentExec() throws Exception {
_scheduler.start();
- _txm.begin();
- String jobId;
- try {
- jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 100));
- Thread.sleep(200);
- // Make sure we don't schedule until commit.
- assertEquals(0, _jobs.size());
- } finally {
- _txm.commit();
+ for (int i=0; i<10; i++) {
+ _txm.begin();
+ String jobId;
+ try {
+ int jobs = _jobs.size();
+ jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 200));
+ Thread.sleep(100);
+ // Make sure we don't schedule until commit.
+ assertEquals(jobs, _jobs.size());
+ } finally {
+ _txm.commit();
+ }
+ // Delete from DB
+ assertEquals(true,_ds.delegate().deleteJob(jobId, "n1"));
+ // Wait for the job to be execed.
+ Thread.sleep(250);
+ // We should always have same number of jobs/commits
+ assertEquals(_jobs.size(), _commit.size());
}
- // Wait for the job to be execed.
- Thread.sleep(100);
- // Should execute job,
- assertEquals(1, _jobs.size());
-
}
public void testImmediateScheduling() throws Exception {
@@ -107,102 +116,124 @@
public void testNearFutureScheduling() throws Exception {
// speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(1000);
- _scheduler.setImmediateInterval(500);
+ _scheduler.setNearFutureInterval(10000);
+ _scheduler.setImmediateInterval(5000);
_scheduler.start();
_txm.begin();
try {
- _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
+ _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 7500));
} finally {
_txm.commit();
}
- Thread.sleep(850);
+ Thread.sleep(8500);
assertEquals(1, _jobs.size());
}
public void testFarFutureScheduling() throws Exception {
// speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(700);
- _scheduler.setImmediateInterval(300);
+ _scheduler.setNearFutureInterval(7000);
+ _scheduler.setImmediateInterval(3000);
_scheduler.start();
_txm.begin();
try {
- _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
+ _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 7500));
} finally {
_txm.commit();
}
- Thread.sleep(850);
+ Thread.sleep(8500);
assertEquals(1, _jobs.size());
}
public void testRecovery() throws Exception {
// speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(200);
- _scheduler.setImmediateInterval(100);
- _scheduler.setStaleInterval(50);
+ _scheduler.setNearFutureInterval(2000);
+ _scheduler.setImmediateInterval(1000);
+ _scheduler.setStaleInterval(500);
_txm.begin();
try {
_scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
- _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 110));
- _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
+ _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 1100));
+ _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 2500));
} finally {
_txm.commit();
}
_scheduler = newScheduler("n3");
- _scheduler.setNearFutureInterval(200);
- _scheduler.setImmediateInterval(100);
- _scheduler.setStaleInterval(50);
+ _scheduler.setNearFutureInterval(2000);
+ _scheduler.setImmediateInterval(1000);
+ _scheduler.setStaleInterval(1000);
_scheduler.start();
- Thread.sleep(400);
+ Thread.sleep(4000);
assertEquals(3, _jobs.size());
}
public void testRecoverySuppressed() throws Exception {
// speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(200);
- _scheduler.setImmediateInterval(100);
- _scheduler.setStaleInterval(50);
+ _scheduler.setNearFutureInterval(2000);
+ _scheduler.setImmediateInterval(1000);
+ _scheduler.setStaleInterval(500);
- // schedule some jobs ...
_txm.begin();
try {
_scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
- _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 150));
- _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
+ _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 1100));
+ _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 15000));
} finally {
_txm.commit();
- }
+ }
+ _scheduler.stop();
- // but don't start the scheduler....
-
- // create a second node for the scheduler.
- SimpleScheduler scheduler = newScheduler("n3");
- scheduler.setNearFutureInterval(200);
- scheduler.setImmediateInterval(100);
- scheduler.setStaleInterval(50);
- scheduler.start();
+ _scheduler = newScheduler("n3");
+ _scheduler.setNearFutureInterval(2000);
+ _scheduler.setImmediateInterval(1000);
+ _scheduler.setStaleInterval(1000);
+ _scheduler.start();
for (int i = 0; i < 40; ++i) {
- scheduler.updateHeartBeat("n1");
- Thread.sleep(10);
+ _scheduler.updateHeartBeat("n1");
+ Thread.sleep(100);
}
- scheduler.stop();
+ _scheduler.stop();
+ Thread.sleep(1000);
- assertTrue(_jobs.size() <= 1);
- if (_jobs.size() == 1)
- assertEquals("far", _jobs.get(0).jobDetail.get("foo"));
+ assertEquals(0, _jobs.size());
}
public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
synchronized (_jobs) {
_jobs.add(jobInfo);
}
+
+ try {
+ _txm.getTransaction().registerSynchronization(new Synchronization() {
+
+ public void afterCompletion(int arg0) {
+ if (arg0 == Status.STATUS_COMMITTED)
+ _commit.add(jobInfo);
+ }
+
+ public void beforeCompletion() {
+ // TODO Auto-generated method stub
+
+ }
+
+ });
+ } catch (IllegalStateException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (RollbackException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } catch (SystemException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
}
Map<String, Object> newDetail(String x) {