You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by bo...@apache.org on 2008/11/15 17:04:47 UTC

svn commit: r717872 - in /ode/trunk/scheduler-simple/src: main/java/org/apache/ode/scheduler/simple/ test/java/org/apache/ode/scheduler/simple/

Author: boisvert
Date: Sat Nov 15 08:04:47 2008
New Revision: 717872

URL: http://svn.apache.org/viewvc?rev=717872&view=rev
Log:
Merge from Ode 1.x branch:
ODE-425: SimpleScheduler recovery is O(n) with respect to outstanding jobs
ODE-424: SimpleScheduler creates duplicate jobs when job execution fails


Modified:
    ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
    ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
    ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
    ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
    ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
    ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
    ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
    ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java Sat Nov 15 08:04:47 2008
@@ -51,10 +51,10 @@
 
     private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid = ?, scheduled = 0 where nodeid = ?";
 
-    private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0 "
+    private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null "
             + "and mod(ts,?) = ? and ts < ?";
 
-    private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0 "
+    private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set nodeid = ? where nodeid is null "
         + "and (ts % ?) = ? and ts < ?";
 
     private static final String SAVE_JOB = "insert into ODE_JOB "
@@ -63,7 +63,7 @@
     private static final String GET_NODEIDS = "select distinct nodeid from ODE_JOB";
 
     private static final String SCHEDULE_IMMEDIATE = "select jobid, ts, transacted, scheduled, details from ODE_JOB "
-            + "where nodeid = ? and scheduled = 0 and ts < ? order by ts";
+            + "where nodeid = ? and ts < ? order by ts";
 
     private static final String UPDATE_SCHEDULED = "update ODE_JOB set scheduled = 1 where jobid in (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
@@ -179,24 +179,6 @@
             }
             rs.close();
             ps.close();
-            
-            // mark jobs as scheduled, UPDATE_SCHEDULED_SLOTS at a time
-            int j = 0;
-            int updateCount = 0;
-            ps = con.prepareStatement(UPDATE_SCHEDULED);
-            for (int updates = 1; updates <= (ret.size() / UPDATE_SCHEDULED_SLOTS) + 1; updates++) {
-                for (int i = 1; i <= UPDATE_SCHEDULED_SLOTS; i++) {
-                    ps.setString(i, j < ret.size() ? ret.get(j).jobId : "");
-                    j++;
-                }
-                ps.execute();
-                updateCount += ps.getUpdateCount();
-            }
-            if (updateCount != ret.size()) {
-                throw new DatabaseException(
-                        "Updating scheduled jobs failed to update all jobs; expected=" + ret.size()
-                                + " actual=" + updateCount);
-            }
         } catch (SQLException se) {
             throw new DatabaseException(se);
         } finally {
@@ -304,7 +286,7 @@
                     d = Dialect.SQLSERVER;
                 } else if (dbProductName.indexOf("MySQL") >= 0) {
                     d = Dialect.MYSQL;
-                } else if (dbProductName.indexOf("Sybase") >= 0) {
+                } else if (dbProductName.indexOf("Sybase") >= 0 || dbProductName.indexOf("Adaptive") >= 0) {
                     d = Dialect.SYBASE;
                 }
             }

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java Sat Nov 15 08:04:47 2008
@@ -25,7 +25,7 @@
 
 /**
  * Like a task, but a little bit better.
- *
+ * 
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
  */
 class Job extends Task {
@@ -37,7 +37,7 @@
     public Job(long when, boolean transacted, Map<String, Object> jobDetail) {
         this(when, new GUID().toString(),transacted,jobDetail);
     }
-
+    
     public Job(long when, String jobId, boolean transacted,Map<String, Object> jobDetail) {
         super(when);
         this.jobId = jobId;
@@ -54,5 +54,9 @@
     public boolean equals(Object obj) {
         return obj instanceof Job && jobId.equals(((Job) obj).jobId);
     }
-
+    
+    @Override
+    public String toString() {
+        return "Job "+jobId+" transacted: "+transacted+" persisted: "+persisted+" details: "+detail;
+    }
 }

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java Sat Nov 15 08:04:47 2008
@@ -31,168 +31,167 @@
 
 /**
  * Implements the "todo" queue and prioritized scheduling mechanism. 
- *
+ * 
  * @author mszefler
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- *
+ * 
  */
 class SchedulerThread implements Runnable {
 
-    private static final Log __log = LogFactory.getLog(SchedulerThread.class);
+	private static final Log __log = LogFactory.getLog(SchedulerThread.class);
 
-    private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
+	private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
 
-    /** Jobs ready for immediate execution. */
-    private PriorityBlockingQueue<Task> _todo;
+	/** Jobs ready for immediate execution. */
+	private PriorityBlockingQueue<Task> _todo;
 
-    /** Lock for managing the queue */
-    private ReentrantLock _lock = new ReentrantLock();
-
-    private Condition _activity = _lock.newCondition();
-
-    private volatile boolean _done;
-
-    private TaskRunner _taskrunner;
-
-    private Thread _thread;
-
-    SchedulerThread(TaskRunner runner) {
-        _todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
-                new JobComparatorByDate());
-        _taskrunner = runner;
-    }
-
-    void start() {
-        if (_thread != null)
-            return;
-
-        _done = false;
-        _thread = new Thread(this, "OdeScheduler");
-        _thread.start();
-    }
-
-    /**
-     * Shutdown the thread.
-     */
-    void stop() {
-        if (_thread == null)
-            return;
-
-        _done = true;
-        _lock.lock();
-        try {
-            _activity.signal();
-        } finally {
-            _lock.unlock();
-
-        }
-
-        while (_thread != null)
-            try {
-                _thread.join();
-                _thread = null;
-            } catch (InterruptedException e) {
-                ;
-            }
-
-    }
-
-    /**
-     * Add a job to the todo queue.
-     *
-     * @param job
-     */
-    void enqueue(Task task) {
-        _lock.lock();
-        try {
-            _todo.add(task);
-            _activity.signal();
-        } finally {
-            _lock.unlock();
-        }
-    }
-
-    /**
-     * Remove a job to the todo queue.
-     *
-     * @param job
-     */
-    void dequeue(Task task) {
-        _lock.lock();
-        try {
-            _todo.remove(task);
-            _activity.signal();
-        } finally {
-            _lock.unlock();
-        }
-    }
-
-
-    /**
-     * Get the size of the todo queue.
-     *
-     * @return
-     */
-    public int size() {
-        return _todo.size();
-    }
-
-    /**
-     * Pop items off the todo queue, and send them to the task runner for processing.
-     */
-    public void run() {
-        while (!_done) {
-            _lock.lock();
-            try {
-                long nextjob;
-                while ((nextjob = nextJobTime()) > 0 && !_done)
-                    _activity.await(nextjob, TimeUnit.MILLISECONDS);
-
-                if (!_done && nextjob == 0) {
-                    Task task = _todo.take();
-                    _taskrunner.runTask(task);
-
-                }
-            } catch (InterruptedException ex) {
-                ; // ignore
-            } finally {
-                _lock.unlock();
-            }
-        }
-    }
-
-    /**
-     * Calculate the time until the next available job.
-     *
-     * @return time until next job, 0 if one is one is scheduled to go, and some
-     *         really large number if there are no jobs to speak of
-     */
-    private long nextJobTime() {
-        assert _lock.isLocked();
-
-        Task job = _todo.peek();
-        if (job == null)
-            return Long.MAX_VALUE;
-
-        return Math.max(0, job.schedDate - System.currentTimeMillis());
-    }
-
-    /**
-     * Remove the tasks of a given type from the list.
-     * @param tasktype type of task
-     */
-    public void clearTasks(final Class<? extends Task> tasktype) {
-        _lock.lock();
-        try {
-            CollectionsX.remove_if(_todo, new MemberOfFunction<Task>() {
-                @Override
-                public boolean isMember(Task o) {
-                    return tasktype.isAssignableFrom(o.getClass());
-                }
-
-            });
-        } finally {
-            _lock.unlock();
-        }
-    }
+	/** Lock for managing the queue */
+	private ReentrantLock _lock = new ReentrantLock();
+
+	private Condition _activity = _lock.newCondition();
+
+	private volatile boolean _done;
+
+	private TaskRunner _taskrunner;
+
+	private Thread _thread;
+
+	SchedulerThread(TaskRunner runner) {
+		_todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
+				new JobComparatorByDate());
+		_taskrunner = runner;
+	}
+
+	void start() {
+		if (_thread != null)
+			return;
+
+		_done = false;
+		_thread = new Thread(this, "OdeScheduler");
+		_thread.start();
+	}
+
+	/**
+	 * Shutdown the thread.
+	 */
+	void stop() {
+		if (_thread == null)
+			return;
+
+		_done = true;
+		_lock.lock();
+		try {
+			_activity.signal();
+		} finally {
+			_lock.unlock();
+
+		}
+
+		while (_thread != null)
+			try {
+				_thread.join();
+				_thread = null;
+			} catch (InterruptedException e) {
+				;
+			}
+
+	}
+
+	/**
+	 * Add a job to the todo queue.
+	 * 
+	 * @param job
+	 */
+	void enqueue(Task task) {
+		_lock.lock();
+		try {
+			_todo.add(task);
+			_activity.signal();
+		} finally {
+			_lock.unlock();
+		}
+	}
+
+	/**
+	 * Remove a job to the todo queue.
+	 *
+	 * @param job
+	 */
+	void dequeue(Task task) {
+		_lock.lock();
+		try {
+			_todo.remove(task);
+			_activity.signal();
+		} finally {
+			_lock.unlock();
+		}
+	}
+
+	/**
+	 * Get the size of the todo queue.
+	 * 
+	 * @return
+	 */
+	public int size() {
+		return _todo.size();
+	}
+
+	/**
+	 * Pop items off the todo queue, and send them to the task runner for processing.
+	 */
+	public void run() {
+		while (!_done) {
+			_lock.lock();
+			try {
+				long nextjob;
+				while ((nextjob = nextJobTime()) > 0 && !_done)
+					_activity.await(nextjob, TimeUnit.MILLISECONDS);
+
+				if (!_done && nextjob == 0) {
+					Task task = _todo.take();
+					_taskrunner.runTask(task);
+
+				}
+			} catch (InterruptedException ex) {
+				; // ignore
+			} finally {
+				_lock.unlock();
+			}
+		}
+	}
+
+	/**
+	 * Calculate the time until the next available job.
+	 * 
+	 * @return time until next job, 0 if one is one is scheduled to go, and some
+	 *         really large number if there are no jobs to speak of
+	 */
+	private long nextJobTime() {
+		assert _lock.isLocked();
+
+		Task job = _todo.peek();
+		if (job == null)
+			return Long.MAX_VALUE;
+
+		return Math.max(0, job.schedDate - System.currentTimeMillis());
+	}
+
+	/**
+	 * Remove the tasks of a given type from the list. 
+	 * @param tasktype type of task
+	 */
+	public void clearTasks(final Class<? extends Task> tasktype) {
+		_lock.lock();
+		try {
+			CollectionsX.remove_if(_todo, new MemberOfFunction<Task>() {
+				@Override
+				public boolean isMember(Task o) {
+					return tasktype.isAssignableFrom(o.getClass());
+				}
+
+			});
+		} finally {
+			_lock.unlock();
+		}
+	}
 }

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Sat Nov 15 08:04:47 2008
@@ -23,10 +23,14 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.transaction.Status;
 import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 
@@ -36,19 +40,22 @@
 import org.apache.ode.bpel.iapi.Scheduler;
 
 /**
- * A reliable and relatively simple scheduler that uses a database to persist information about scheduled tasks.
- *
- * The challange is to achieve high performance in a small memory footprint without loss of reliability while supporting
- * distributed/clustered configurations.
- *
- * The design is based around three time horizons: "immediate", "near future", and "everything else". Immediate jobs (i.e. jobs that
- * are about to be up) are written to the database and kept in an in-memory priority queue. When they execute, they are removed from
- * the database. Near future jobs are placed in the database and assigned to the current node, however they are not stored in
- * memory. Periodically jobs are "upgraded" from near-future to immediate status, at which point they get loaded into memory. Jobs
- * that are further out in time, are placed in the database without a node identifer; when they are ready to be "upgraded" to
- * near-future jobs they are assigned to one of the known live nodes. Recovery is rather straighforward, with stale node identifiers
- * being reassigned to known good nodes.
- *
+ * A reliable and relatively simple scheduler that uses a database to persist information about 
+ * scheduled tasks.
+ * 
+ * The challenge is to achieve high performance in a small memory footprint without loss of reliability
+ * while supporting distributed/clustered configurations.
+ * 
+ * The design is based around three time horizons: "immediate", "near future", and "everything else". 
+ * Immediate jobs (i.e. jobs that are about to be up) are written to the database and kept in
+ * an in-memory priority queue. When they execute, they are removed from the database. Near future
+ * jobs are placed in the database and assigned to the current node, however they are not stored in
+ * memory. Periodically jobs are "upgraded" from near-future to immediate status, at which point they
+ * get loaded into memory. Jobs that are further out in time, are placed in the database without a 
+ * node identifer; when they are ready to be "upgraded" to near-future jobs they are assigned to one
+ * of the known live nodes. Recovery is rather straighforward, with stale node identifiers being 
+ * reassigned to known good nodes.       
+ * 
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
  *
  */
@@ -62,7 +69,7 @@
     long _immediateInterval = 30000;
 
     /**
-     * Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
+     * Jobs scheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
      * node, but will not be placed on the todo queue (the promoter will pick them up).
      */
     long _nearFutureInterval = 10 * 60 * 1000;
@@ -70,8 +77,17 @@
     /** 10s of no communication and you are deemed dead. */
     long _staleInterval = 10000;
 
+    /**
+     * Estimated sustained transaction per second capacity of the system.
+     * e.g. 100 means the system can process 100 jobs per seconds, on average
+     * This number is used to determine how many jobs to load from the database at once.
+     */
+    int _tps = 100;
+
     TransactionManager _txm;
 
+    ExecutorService _exec;
+
     String _nodeId;
 
     /** Maximum number of jobs in the "near future" / todo queue. */
@@ -103,10 +119,26 @@
     public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
         _nodeId = nodeId;
         _db = del;
-        _todoLimit = Integer.parseInt(conf.getProperty("ode.scheduler.queueLength", "10000"));
+        _todoLimit = getIntProperty(conf, "ode.scheduler.queueLength", _todoLimit);
+        _immediateInterval = getLongProperty(conf, "ode.scheduler.immediateInterval", _immediateInterval);
+        _nearFutureInterval = getLongProperty(conf, "ode.scheduler.nearFutureInterval", _nearFutureInterval);
+        _staleInterval = getLongProperty(conf, "ode.scheduler.staleInterval", _staleInterval);
+        _tps = getIntProperty(conf, "ode.scheduler.transactionsPerSecond", _tps);
         _todo = new SchedulerThread(this);
     }
 
+    private int getIntProperty(Properties props, String propName, int defaultValue) {
+        String s = props.getProperty(propName);
+        if (s != null) return Integer.parseInt(s);
+        else return defaultValue;
+    }
+
+    private long getLongProperty(Properties props, String propName, long defaultValue) {
+        String s = props.getProperty(propName);
+        if (s != null) return Long.parseLong(s);
+        else return defaultValue;
+    }
+        
     public void setNodeId(String nodeId) {
         _nodeId = nodeId;
     }
@@ -123,6 +155,10 @@
         _nearFutureInterval = nearFutureInterval;
     }
 
+    public void setTransactionsPerSecond(int tps) {
+        _tps = tps;
+    }
+
     public void setTransactionManager(TransactionManager txm) {
         _txm = txm;
     }
@@ -131,6 +167,10 @@
         _db = dbd;
     }
 
+    public void setExecutorService(ExecutorService executorService) {
+        _exec = executorService;
+    }
+
     public void cancelJob(String jobId) throws ContextException {
         _todo.dequeue(new Job(0, jobId, false, null));
         try {
@@ -141,6 +181,20 @@
         }
     }
 
+    public <T> Future<T> execIsolatedTransaction(final Callable<T> transaction) throws Exception, ContextException {
+        return _exec.submit(new Callable<T>() {
+            public T call() throws Exception {
+                try {
+                    return execTransaction(transaction);
+                } catch (Exception e) {
+                    __log.error("An exception occured while executing an isolated transaction, " +
+                            "the transaction is going to be abandoned.", e);
+                    return null;
+                }
+            }
+        });
+    }
+
     public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
         try {
             if (__log.isDebugEnabled()) __log.debug("Beginning a new transaction");
@@ -167,6 +221,24 @@
         }
     }
 
+    public void registerSynchronizer(final Synchronizer synch) throws ContextException {
+        try {
+            _txm.getTransaction().registerSynchronization(new Synchronization() {
+
+                public void beforeCompletion() {
+                    synch.beforeCompletion();
+                }
+
+                public void afterCompletion(int status) {
+                    synch.afterCompletion(status == Status.STATUS_COMMITTED);
+                }
+
+            });
+        } catch (Exception e) {
+            throw new ContextException("Unable to register synchronizer.", e);
+        }
+    }
+
     public String schedulePersistedJob(final Map<String, Object> jobDetail, Date when) throws ContextException {
         long ctime = System.currentTimeMillis();
         if (when == null)
@@ -182,25 +254,20 @@
 
         try {
             if (immediate) {
-                // If we have too many jobs in the queue, we don't allow any new ones
-                if (_todo.size() > _todoLimit) {
-                    __log.error("The execution queue is backed up, the engine can't keep up with the load. Either " +
-                            "increase the queue size or regulate the flow.");
-                    return null;
-                }
-
                 // Immediate scheduling means we put it in the DB for safe keeping
                 _db.insertJob(job, _nodeId, true);
+                
                 // And add it to our todo list .
-                addTodoOnCommit(job);
-
+                if (_todo.size() < _todoLimit) {
+                    addTodoOnCommit(job);
+                }
                 __log.debug("scheduled immediate job: " + job.jobId);
             } else if (nearfuture) {
                 // Near future, assign the job to ourselves (why? -- this makes it very unlikely that we
                 // would get two nodes trying to process the same instance, which causes unsightly rollbacks).
                 _db.insertJob(job, _nodeId, false);
                 __log.debug("scheduled near-future job: " + job.jobId);
-            } else /* far future */{
+            } else /* far future */ {
                 // Not the near future, we don't assign a node-id, we'll assign it later.
                 _db.insertJob(job, null, false);
                 __log.debug("scheduled far-future job: " + job.jobId);
@@ -235,6 +302,9 @@
         if (_running)
             return;
 
+        if (_exec == null)
+            _exec = Executors.newCachedThreadPool();
+
         _todo.clearTasks(UpgradeJobsTask.class);
         _todo.clearTasks(LoadImmediateTask.class);
         _todo.clearTasks(CheckStaleNodes.class);
@@ -255,23 +325,28 @@
             throw new ContextException("Error retrieving node list.", ex);
         }
 
+        long now = System.currentTimeMillis();
+        
         // Pretend we got a heartbeat...
-        for (String s : _knownNodes)
-            _lastHeartBeat.put(s, System.currentTimeMillis());
+        for (String s : _knownNodes) _lastHeartBeat.put(s, now);
 
         // schedule immediate job loading for now!
-        _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));
+        _todo.enqueue(new LoadImmediateTask(now));
 
         // schedule check for stale nodes, make it random so that the nodes don't overlap.
-        _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long) (_random.nextDouble() * _staleInterval)));
+        _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
 
         // do the upgrade sometime (random) in the immediate interval.
-        _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long) (_random.nextDouble() * _immediateInterval)));
+        _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
 
         _todo.start();
         _running = true;
     }
-
+    
+    private long randomMean(long mean) {
+        return (long) _random.nextDouble() * mean + (mean/2);
+    }
+        
     public synchronized void stop() {
         if (!_running)
             return;
@@ -283,28 +358,6 @@
         _running = false;
     }
 
-    public void jobCompleted(String jobId) {
-        boolean deleted = false;
-        try {
-            deleted = _db.deleteJob(jobId, _nodeId);
-        } catch (DatabaseException de) {
-            String errmsg = "Database error.";
-            __log.error(errmsg, de);
-            throw new ContextException(errmsg, de);
-        }
-
-        if (!deleted) {
-            try {
-                _txm.getTransaction().setRollbackOnly();
-            } catch (Exception ex) {
-                __log.error("Transaction manager error; setRollbackOnly() failed.", ex);
-            }
-
-            throw new ContextException("Job no longer in database: jobId=" + jobId);
-        }
-    }
-
-
     /**
      * Run a job in the current thread.
      *
@@ -315,53 +368,70 @@
         final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail,
                 (Integer)(job.detail.get("retry") != null ? job.detail.get("retry") : 0));
 
-        try {
-            try {
-                _jobProcessor.onScheduledJob(jobInfo);
-            } catch (JobProcessorException jpe) {
-                if (jpe.retry)
-                    __log.error("Error while processing transaction, retrying in " + doRetry(job) + "s");
-                else
-                    __log.error("Error while processing transaction, no retry.", jpe);
+        _exec.submit(new Callable<Void>() {
+            public Void call() throws Exception {
+                if (job.transacted) {
+                    try {
+                        execTransaction(new Callable<Void>() {
+                            public Void call() throws Exception {
+                                if (job.persisted)
+                                    if (!_db.deleteJob(job.jobId, _nodeId))
+                                        throw new JobNoLongerInDbException(job.jobId,_nodeId);
+                                    
+                                try {
+                                    _jobProcessor.onScheduledJob(jobInfo);
+                                } catch (JobProcessorException jpe) {
+                                    if (jpe.retry) {
+                                        int retry = job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
+                                        if (retry <= 10) {
+                                            long delay = doRetry(job);
+                                            __log.error("Error while processing transaction, retrying in " + delay + "s");
+                                        } else {
+                                            __log.error("Error while processing transaction after 10 retries, no more retries:"+job);
+                                        }
+                                    } else {
+                                        __log.error("Error while processing transaction, no retry.", jpe);
+                                    }
+                                }
+                                return null;
+                            }
+                        });
+                    } catch (JobNoLongerInDbException jde) {
+                        // This may happen if two node try to do the same job... we try to avoid
+                        // it the synchronization is a best-effort but not perfect.
+                        __log.debug("job no longer in db forced rollback.");
+                    } catch (Exception ex) {
+                        __log.error("Error while executing transaction", ex);
+                    }
+                } else {
+                    _jobProcessor.onScheduledJob(jobInfo);
+                }
+                return null;
             }
-        } catch (Exception ex) {
-            __log.error("Error in scheduler processor.", ex);
-        }
-
+        });
     }
 
     private void addTodoOnCommit(final Job job) {
+        registerSynchronizer(new Synchronizer() {
 
-        Transaction tx;
-        try {
-            tx = _txm.getTransaction();
-        } catch (Exception ex) {
-            String errmsg = "Transaction manager error; unable to obtain transaction.";
-            __log.error(errmsg, ex);
-            throw new ContextException(errmsg, ex);
-        }
-
-        if (tx == null)
-            throw new ContextException("Missing required transaction in thread " + Thread.currentThread());
-
-        try {
-            tx.registerSynchronization(new Synchronization() {
-
-                public void afterCompletion(int status) {
-                    if (status == Status.STATUS_COMMITTED) {
-                        _todo.enqueue(job);
-                    }
+            public void afterCompletion(boolean success) {
+                if (success) {
+                    _todo.enqueue(job);
                 }
+            }
 
-                public void beforeCompletion() {
-                }
+            public void beforeCompletion() {
+            }
 
-            });
+        });
+    }
 
-        } catch (Exception e) {
-            String errmsg = "Unable to registrer synchronizer. ";
-            __log.error(errmsg, e);
-            throw new ContextException(errmsg, e);
+    public boolean isTransacted() {
+        try {
+            Transaction tx = _txm.getTransaction();
+            return (tx != null && tx.getStatus() != Status.STATUS_NO_TRANSACTION);
+        } catch (SystemException e) {
+            throw new ContextException("Internal Error: Could not obtain transaction status.");
         }
     }
 
@@ -385,21 +455,25 @@
 
     boolean doLoadImmediate() {
         __log.debug("LOAD IMMEDIATE started");
+        
+        // don't load anything if we're already half-full;  we've got plenty to do already
+        if (_todo.size() > _todoLimit/2) return true;
+        
         List<Job> jobs;
         try {
-            do {
-                jobs = execTransaction(new Callable<List<Job>>() {
-                    public List<Job> call() throws Exception {
-                        return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, 10);
-                    }
-                });
-                for (Job j : jobs) {
-                    if (__log.isDebugEnabled())
-                        __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
+            final int batch = (int) (_immediateInterval * _tps / 1000);
+            jobs = execTransaction(new Callable<List<Job>>() {
+                public List<Job> call() throws Exception {
+                    return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, batch);
+                }
+            });
+            for (Job j : jobs) {
+                if (__log.isDebugEnabled())
+                    __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
 
+                if (_todo.size() < _todoLimit) 
                     _todo.enqueue(j);
-                }
-            } while (jobs.size() == 10);
+            }
             return true;
         } catch (Exception ex) {
             __log.error("Error loading immediate jobs from database.", ex);
@@ -447,7 +521,6 @@
 
     /**
      * Re-assign stale node's jobs to self.
-     *
      * @param nodeId
      */
     void recoverStaleNode(final String nodeId) {
@@ -476,6 +549,7 @@
         } finally {
             __log.debug("node recovery complete");
         }
+
     }
 
     private long doRetry(Job job) throws DatabaseException {
@@ -507,9 +581,9 @@
                 success = doLoadImmediate();
             } finally {
                 if (success)
-                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .75)));
+                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .90)));
                 else
-                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 100));
+                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 1000));
             }
         }
 
@@ -517,7 +591,6 @@
 
     /**
      * Upgrade jobs from far future to immediate future (basically, assign them to a node).
-     *
      * @author mszefler
      *
      */
@@ -544,7 +617,7 @@
             try {
                 success = doUpgrade();
             } finally {
-                long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100);
+                long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000);
                 _nextUpgrade.set(future);
                 _todo.enqueue(new UpgradeJobsTask(future));
                 __log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms");
@@ -567,11 +640,16 @@
             __log.debug("CHECK STALE NODES started");
             for (String nodeId : _knownNodes) {
                 Long lastSeen = _lastHeartBeat.get(nodeId);
-                if (lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
+                if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
+                    && !_nodeId.equals(nodeId))
+                {
                     recoverStaleNode(nodeId);
+                }
             }
         }
 
+
     }
 
+
 }

Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java Sat Nov 15 08:04:47 2008
@@ -43,7 +43,7 @@
         _ds = new DelegateSupport();
         _del = _ds.delegate();
     }
-     
+    
     
     public void testGetNodeIds() throws Exception {
         // should have no node ids in the db, empty list (not null)
@@ -91,8 +91,9 @@
         assertEquals("j1",jobs.get(0).jobId);
         jobs = _del.dequeueImmediate("n1", 250L, 1000);
         assertNotNull(jobs);
-        assertEquals(1, jobs.size());
-        assertEquals("j2",jobs.get(0).jobId);
+        assertEquals(2, jobs.size());
+        assertEquals("j1",jobs.get(0).jobId);
+        assertEquals("j2",jobs.get(1).jobId);
     }
     
     public void testScheduleImmediateMaxRows() throws Exception {
@@ -103,10 +104,6 @@
         assertNotNull(jobs);
         assertEquals(1, jobs.size());
         assertEquals("j1",jobs.get(0).jobId);
-        jobs = _del.dequeueImmediate("n1", 250L, 1000);
-        assertNotNull(jobs);
-        assertEquals(1, jobs.size());
-        assertEquals("j2",jobs.get(0).jobId);
     }
 
     public void testScheduleImmediateNodeFilter() throws Exception {

Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java Sat Nov 15 08:04:47 2008
@@ -51,7 +51,7 @@
 
     public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
         _tried++;
-        throw new Scheduler.JobProcessorException(jobInfo.retryCount < 2);
+        throw new Scheduler.JobProcessorException(jobInfo.retryCount < 3);
     }
 
     Map<String, Object> newDetail(String x) {

Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java Sat Nov 15 08:04:47 2008
@@ -52,7 +52,7 @@
         _st.start();
         long schedtime = System.currentTimeMillis() + 300;
         _st.enqueue(new Task(schedtime));
-        Thread.sleep(600);
+        Thread.sleep(1000);
         assertEquals(1,_tasks.size());
         assertTrue(_tasks.get(0).time < schedtime + SCHED_TOLERANCE / 2);
         assertTrue(_tasks.get(0).time > schedtime - SCHED_TOLERANCE / 2);

Modified: ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?rev=717872&r1=717871&r2=717872&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java (original)
+++ ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java Sat Nov 15 08:04:47 2008
@@ -21,29 +21,34 @@
 
 import java.util.*;
 
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
 import javax.transaction.TransactionManager;
 
 import junit.framework.TestCase;
 
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
+import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 
 public class SimpleSchedulerTest extends TestCase implements JobProcessor {
 
     DelegateSupport _ds;
     SimpleScheduler _scheduler;
     ArrayList<JobInfo> _jobs;
+    ArrayList<JobInfo> _commit;
     TransactionManager _txm;
 
-
     public void setUp() throws Exception {
         _txm = new GeronimoTransactionManager();
         _ds = new DelegateSupport();
 
         _scheduler = newScheduler("n1");
         _jobs = new ArrayList<JobInfo>(100);
+        _commit = new ArrayList<JobInfo>(100);
     }
 
     public void tearDown() throws Exception {
@@ -52,21 +57,25 @@
 
     public void testConcurrentExec() throws Exception  {
         _scheduler.start();
-        _txm.begin();
-        String jobId;
-        try {
-            jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 100));
-            Thread.sleep(200);
-            // Make sure we don't schedule until commit.
-            assertEquals(0, _jobs.size());
-        } finally {
-            _txm.commit();
+        for (int i=0; i<10; i++) {
+            _txm.begin();
+            String jobId;
+            try {
+                int jobs = _jobs.size();
+                jobId = _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 200));
+                Thread.sleep(100);
+                // Make sure we don't schedule until commit.
+                assertEquals(jobs, _jobs.size());
+            } finally {
+                _txm.commit();
+            }
+            // Delete from DB
+            assertEquals(true,_ds.delegate().deleteJob(jobId, "n1"));
+            // Wait for the job to be execed.
+            Thread.sleep(250);
+            // We should always have same number of jobs/commits
+            assertEquals(_jobs.size(), _commit.size());
         }
-        // Wait for the job to be execed.
-        Thread.sleep(100);
-        // Should execute job,
-        assertEquals(1, _jobs.size());
-
     }
     
     public void testImmediateScheduling() throws Exception {
@@ -107,102 +116,124 @@
 
     public void testNearFutureScheduling() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(1000);
-        _scheduler.setImmediateInterval(500);
+        _scheduler.setNearFutureInterval(10000);
+        _scheduler.setImmediateInterval(5000);
         _scheduler.start();
 
         _txm.begin();
         try {
-            _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
+            _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 7500));
         } finally {
             _txm.commit();
         }
 
-        Thread.sleep(850);
+        Thread.sleep(8500);
         assertEquals(1, _jobs.size());
     }
 
     public void testFarFutureScheduling() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(700);
-        _scheduler.setImmediateInterval(300);
+        _scheduler.setNearFutureInterval(7000);
+        _scheduler.setImmediateInterval(3000);
         _scheduler.start();
 
         _txm.begin();
         try {
-            _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 750));
+            _scheduler.schedulePersistedJob(newDetail("123"), new Date(System.currentTimeMillis() + 7500));
         } finally {
             _txm.commit();
         }
 
-        Thread.sleep(850);
+        Thread.sleep(8500);
         assertEquals(1, _jobs.size());
     }
 
     public void testRecovery() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(200);
-        _scheduler.setImmediateInterval(100);
-        _scheduler.setStaleInterval(50);
+        _scheduler.setNearFutureInterval(2000);
+        _scheduler.setImmediateInterval(1000);
+        _scheduler.setStaleInterval(500);
 
         _txm.begin();
         try {
             _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
-            _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 110));
-            _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
+            _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 1100));
+            _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 2500));
         } finally {
             _txm.commit();
         }
 
         _scheduler = newScheduler("n3");
-        _scheduler.setNearFutureInterval(200);
-        _scheduler.setImmediateInterval(100);
-        _scheduler.setStaleInterval(50);
+        _scheduler.setNearFutureInterval(2000);
+        _scheduler.setImmediateInterval(1000);
+        _scheduler.setStaleInterval(1000);
         _scheduler.start();
-        Thread.sleep(400);
+        Thread.sleep(4000);
         assertEquals(3, _jobs.size());
     }
 
     public void testRecoverySuppressed() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(200);
-        _scheduler.setImmediateInterval(100);
-        _scheduler.setStaleInterval(50);
+        _scheduler.setNearFutureInterval(2000);
+        _scheduler.setImmediateInterval(1000);
+        _scheduler.setStaleInterval(500);
 
-        // schedule some jobs ...
         _txm.begin();
         try {
             _scheduler.schedulePersistedJob(newDetail("immediate"), new Date(System.currentTimeMillis()));
-            _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 150));
-            _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 250));
+            _scheduler.schedulePersistedJob(newDetail("near"), new Date(System.currentTimeMillis() + 1100));
+            _scheduler.schedulePersistedJob(newDetail("far"), new Date(System.currentTimeMillis() + 15000));
         } finally {
             _txm.commit();
-        } 
+        }
+        _scheduler.stop();
 
-        // but don't start the scheduler.... 
-        
-        // create a second node for the scheduler. 
-        SimpleScheduler scheduler = newScheduler("n3");
-        scheduler.setNearFutureInterval(200);
-        scheduler.setImmediateInterval(100);
-        scheduler.setStaleInterval(50);
-        scheduler.start();
+        _scheduler = newScheduler("n3");
+        _scheduler.setNearFutureInterval(2000);
+        _scheduler.setImmediateInterval(1000);
+        _scheduler.setStaleInterval(1000);
+        _scheduler.start();
         for (int i = 0; i < 40; ++i) {
-            scheduler.updateHeartBeat("n1");
-            Thread.sleep(10);
+            _scheduler.updateHeartBeat("n1");
+            Thread.sleep(100);
         }
 
-        scheduler.stop();
+        _scheduler.stop();
+        Thread.sleep(1000);
 
-        assertTrue(_jobs.size() <= 1);
-        if (_jobs.size() == 1)
-            assertEquals("far", _jobs.get(0).jobDetail.get("foo"));
+        assertEquals(0, _jobs.size());
     }
 
     public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
         synchronized (_jobs) {
             _jobs.add(jobInfo);
         }
+        
+        try {
+            _txm.getTransaction().registerSynchronization(new Synchronization() {
+
+                public void afterCompletion(int arg0) {
+                    if (arg0 == Status.STATUS_COMMITTED) 
+                        _commit.add(jobInfo);
+                }
+
+                public void beforeCompletion() {
+                    // TODO Auto-generated method stub
+                    
+                }
+                
+            });
+        } catch (IllegalStateException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (RollbackException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } catch (SystemException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+
     }
 
     Map<String, Object> newDetail(String x) {