You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by je...@apache.org on 2010/05/02 19:03:00 UTC

svn commit: r940263 [15/16] - in /ode/trunk: ./ axis2-war/ axis2-war/src/main/assembly/ axis2-war/src/main/webapp/WEB-INF/conf.hib-derby/ axis2-war/src/main/webapp/WEB-INF/conf.jpa-derby/ axis2-war/src/main/webapp/WEB-INF/conf/ axis2-war/src/test/java/...

Modified: ode/trunk/pom.xml
URL: http://svn.apache.org/viewvc/ode/trunk/pom.xml?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/pom.xml (original)
+++ ode/trunk/pom.xml Sun May  2 17:02:51 2010
@@ -64,7 +64,8 @@
         <wsdl4j.version>1.6.2</wsdl4j.version>
 	<woodstox.version>3.2.4</woodstox.version>
 	<javax.mail.version>1.4</javax.mail.version>
-        <hibernate.version>3.3.1.GA</hibernate.version>
+        <hibernate.version>3.3.2.GA</hibernate.version>
+	<hibernate.entitymanager.version>3.4.0.GA</hibernate.entitymanager.version>
         <javassist.version>3.4.GA</javassist.version>
 	<spring.version>2.5.6</spring.version>
         <geronimo.specs.version>1.0</geronimo.specs.version>
@@ -88,7 +89,8 @@
         <xmlbeans.version>2.3.0</xmlbeans.version>
         <xstream.version>1.2</xstream.version>
 	<junit.version>4.4</junit.version>
-        <hsqldb.version>1.8.0.7</hsqldb.version>
+        <hsqldb.version>1.8.0.10</hsqldb.version>
+        <h2.version>1.2.131</h2.version>
 	<persistence-api.version>1.0</persistence-api.version>
 	<xalan.version>2.7.1</xalan.version>
         <ant.version>1.6.5</ant.version>
@@ -123,16 +125,18 @@
       <module>dao-hibernate</module>
       <module>tools</module>
       <module>bpel-store</module>
-	  <module>dao-jpa-db</module>
+      <module>dao-jpa-db</module>
+      <module>dao-jpa-ojpa</module>
+      <module>dao-jpa-hibernate</module>
       <module>dao-hibernate-db</module>
       <module>engine</module>
       <module>bpel-connector</module>
-	  <module>bpel-test</module>
-	  <module>extensions</module>
-	  <module>axis2</module>
-	  <module>jbi</module>
-	  <module>axis2-war</module>
-	  <module>distro</module>
+      <module>bpel-test</module>
+      <module>extensions</module>
+      <module>axis2</module>
+      <module>jbi</module>
+      <module>axis2-war</module>
+      <module>distro</module>
     </modules>
 
     <build>
@@ -199,7 +203,7 @@
 		      <groupId>org.apache.derby</groupId>
 		      <artifactId>derbytools</artifactId>
 		      <version>${derby.version}</version>
-		    </dependency> 
+		    </dependency>
 		</dependencies>
 	      </plugin>
 
@@ -386,6 +390,22 @@
                 <artifactId>ode-dao-jpa</artifactId>
                 <version>${ode.version}</version>
             </dependency>
+			<dependency>
+              <groupId>org.apache.ode</groupId>
+              <artifactId>ode-dao-jpa</artifactId>
+              <version>${project.version}</version>
+              <classifier>openjpa</classifier>
+            </dependency>
+            <dependency>
+              <groupId>org.apache.ode</groupId>
+              <artifactId>ode-dao-jpa-ojpa</artifactId>
+              <version>${project.version}</version>
+            </dependency>
+            <dependency>
+              <groupId>org.apache.ode</groupId>
+              <artifactId>ode-dao-jpa-hibernate</artifactId>
+              <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.ode</groupId>
                 <artifactId>ode-jca-ra</artifactId>
@@ -703,7 +723,19 @@
                     </exclusion>
                 </exclusions>
             </dependency>
- 
+
+			<dependency>
+                <groupId>org.hibernate</groupId>
+                <artifactId>hibernate-entitymanager</artifactId>
+                <version>${hibernate.entitymanager.version}</version>
+				<exclusions>
+                    <exclusion>
+                        <artifactId>jta</artifactId>
+                        <groupId>javax.transaction</groupId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
             <dependency>
                 <groupId>javassist</groupId>
                 <artifactId>javassist</artifactId>
@@ -806,6 +838,12 @@
                 <artifactId>hsqldb</artifactId>
                 <version>${hsqldb.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.h2database</groupId>
+                <artifactId>h2</artifactId>
+                <version>${h2.version}</version>
+            </dependency>
+
 
             <!-- AXIS2 dependencies -->
             <dependency>
@@ -893,3 +931,5 @@
     </dependencyManagement>
 
 </project>
+
+

Modified: ode/trunk/scheduler-simple/pom.xml
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/pom.xml?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/pom.xml (original)
+++ ode/trunk/scheduler-simple/pom.xml Sun May  2 17:02:51 2010
@@ -19,77 +19,145 @@
   -->
 
 <project>
-    <modelVersion>4.0.0</modelVersion>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.ode</groupId>
+  <artifactId>ode-scheduler-simple</artifactId>
+  <name>ODE :: BPEL Scheduler Simple</name>
+  <parent>
     <groupId>org.apache.ode</groupId>
-    <artifactId>ode-scheduler-simple</artifactId>
-    <name>ODE :: BPEL Scheduler Simple</name>
-    <parent>
-        <groupId>org.apache.ode</groupId>
-        <artifactId>ode</artifactId>
-        <version>2.0-SNAPSHOT</version>
-    </parent>
+    <artifactId>ode</artifactId>
+    <version>2.0-SNAPSHOT</version>
+  </parent>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.ode</groupId>
-            <artifactId>ode-bpel-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.ode</groupId>
-            <artifactId>ode-utils</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>commons-collections</groupId>
-          <artifactId>commons-collections</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>commons-logging</groupId>
-          <artifactId>commons-logging</artifactId>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.geronimo.specs</groupId>
-          <artifactId>geronimo-jta_1.1_spec</artifactId>
-        </dependency>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.ode</groupId>
+      <artifactId>ode-bpel-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ode</groupId>
+      <artifactId>ode-bpel-dao</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ode</groupId>
+      <artifactId>ode-il-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.ode</groupId>
+      <artifactId>ode-utils</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jta_1.1_spec</artifactId>
+    </dependency>
 
-        <dependency>
-          <groupId>junit</groupId>
-          <artifactId>junit</artifactId>
-	  <scope>test</scope>
-        </dependency>
-        <dependency>
-          <groupId>hsqldb</groupId>
-          <artifactId>hsqldb</artifactId>
-	  <scope>test</scope>
-        </dependency>
-        <dependency>
-          <groupId>backport-util-concurrent</groupId>
-          <artifactId>backport-util-concurrent</artifactId>
-	  <scope>test</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.geronimo.modules</groupId>
-          <artifactId>geronimo-kernel</artifactId>
-	  <scope>test</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.geronimo.components</groupId>
-          <artifactId>geronimo-transaction</artifactId>
-	  <scope>test</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.geronimo.specs</groupId>
-          <artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
-	  <scope>test</scope>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.geronimo.specs</groupId>
-          <artifactId>geronimo-ejb_2.1_spec</artifactId>
-	  <scope>test</scope>
-        </dependency>
-        <dependency>
-          <groupId>log4j</groupId>
-          <artifactId>log4j</artifactId>
-	  <scope>test</scope>
-        </dependency>
-    </dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>backport-util-concurrent</groupId>
+      <artifactId>backport-util-concurrent</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.modules</groupId>
+      <artifactId>geronimo-kernel</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.components</groupId>
+      <artifactId>geronimo-transaction</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-ejb_2.1_spec</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.ode</groupId>
+      <artifactId>ode-dao-jpa-ojpa</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- for the integration tests - seems to work with the openjpa enhanced enities -->
+    <dependency>
+      <groupId>org.apache.ode</groupId>
+      <artifactId>ode-dao-jpa-hibernate</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>hibernate-jpa</id>
+            <phase>integration-test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <configuration>
+              <reportsDirectory>${project.build.directory}/surefire-reports/hibernate-jpa</reportsDirectory>
+              <systemProperties>
+                <property>
+                  <name>dao.factory.scheduler</name>
+                  <value>org.apache.ode.dao.jpa.hibernate.SchedulerDAOConnectionFactoryImpl</value>
+                </property>
+              </systemProperties>
+              <excludes>
+                <exclude>**/SchedulerThreadTest.java</exclude>
+              </excludes>
+            </configuration>
+          </execution>
+          <execution>
+            <id>jdbc</id>
+            <phase>integration-test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <configuration>
+              <reportsDirectory>${project.build.directory}/surefire-reports/jdbc</reportsDirectory>
+              <systemProperties>
+                <property>
+                  <name>dao.factory.scheduler</name>
+                  <value>org.apache.ode.scheduler.simple.jdbc.SchedulerDAOConnectionFactoryImpl</value>
+                </property>
+              </systemProperties>
+              <excludes>
+                <exclude>**/SchedulerThreadTest.java</exclude>
+              </excludes>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
 </project>

Added: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java (added)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java Sun May  2 17:02:51 2010
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.scheduler.simple;
+
+import org.apache.ode.dao.scheduler.JobDAO;
+
+/**
+ * The thing that we schedule.
+ * 
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ *
+ */
+class JobDAOTask extends Task {
+
+  public JobDAO dao;
+  public String jobId;
+
+  JobDAOTask(JobDAO job) {
+    super(job.getScheduledDate());
+    this.dao = job;
+    this.jobId=job.getJobId();
+  }
+
+  /**
+   * Used for dequeuing a Job without a lookup
+   * @param jobId
+   */
+  JobDAOTask(String jobId) {
+    super(0L);
+    this.jobId=jobId;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj instanceof JobDAOTask && jobId.equals(((JobDAOTask) obj).jobId);
+  }
+
+  @Override
+  public int hashCode() {
+    return jobId.hashCode();
+  }
+}

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java Sun May  2 17:02:51 2010
@@ -30,7 +30,7 @@ import org.apache.ode.utils.stl.Collecti
 import org.apache.ode.utils.stl.MemberOfFunction;
 
 /**
- * Implements the "todo" queue and prioritized scheduling mechanism. 
+ * Implements the "todo" queue and prioritized scheduling mechanism.
  *
  * @author mszefler
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
@@ -58,7 +58,7 @@ class SchedulerThread implements Runnabl
 
     SchedulerThread(TaskRunner runner) {
         _todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
-                new JobComparatorByDate());
+                new TaskComparatorByDate());
         _taskrunner = runner;
     }
 

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Sun May  2 17:02:51 2010
@@ -16,27 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.ode.scheduler.simple;
 
 import java.util.*;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.transaction.Status;
 import javax.transaction.Synchronization;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
-import javax.xml.namespace.QName;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.common.CorrelationKey;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.Scheduler.JobType;
+import org.apache.ode.dao.scheduler.JobDAO;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
 
 /**
  * A reliable and relatively simple scheduler that uses a database to persist information about scheduled tasks.
@@ -56,305 +53,257 @@ import org.apache.ode.bpel.iapi.Schedule
  *
  */
 public class SimpleScheduler implements Scheduler, TaskRunner {
-    private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
-
-    /**
-     * Jobs scheduled with a time that is between [now, now+immediateInterval] will be assigned to the current node, and placed
-     * directly on the todo queue.
-     */
-    long _immediateInterval = 30000;
-
-    /**
-     * Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
-     * node, but will not be placed on the todo queue (the promoter will pick them up).
-     */
-    long _nearFutureInterval = 10 * 60 * 1000;
-
-    /** 10s of no communication and you are deemed dead. */
-    long _staleInterval = 10000;
-
-    TransactionManager _txm;
-
-    String _nodeId;
-
-    /** Maximum number of jobs in the "near future" / todo queue. */
-    int _todoLimit = 10000;
-
-    /** The object that actually handles the jobs. */
-    volatile JobProcessor _jobProcessor;
-
-    volatile JobProcessor _polledRunnableProcessor;
-
-    private SchedulerThread _todo;
-
-    private DatabaseDelegate _db;
-
-    /** All the nodes we know about */
-    private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
-
-    /** When we last heard from our nodes. */
-    private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
-
-    private boolean _running;
-
-    /** Time for next upgrade. */
-    private AtomicLong _nextUpgrade = new AtomicLong();
-
-    /** Time for next job load */
-    private AtomicLong _nextScheduleImmediate = new AtomicLong();
-
-    private Random _random = new Random();
-
-    public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
-        _nodeId = nodeId;
-        _db = del;
-        _todoLimit = Integer.parseInt(conf.getProperty("ode.scheduler.queueLength", "10000"));
-        _todo = new SchedulerThread(this);
-    }
-
-    public void setNodeId(String nodeId) {
-        _nodeId = nodeId;
-    }
-
-    public void setStaleInterval(long staleInterval) {
-        _staleInterval = staleInterval;
-    }
-
-    public void setImmediateInterval(long immediateInterval) {
-        _immediateInterval = immediateInterval;
-    }
-
-    public void setNearFutureInterval(long nearFutureInterval) {
-        _nearFutureInterval = nearFutureInterval;
-    }
-
-    public void setTransactionManager(TransactionManager txm) {
-        _txm = txm;
-    }
-
-    public void setDatabaseDelegate(DatabaseDelegate dbd) {
-        _db = dbd;
-    }
-
-    public void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor) {
-        _polledRunnableProcessor = polledRunnableProcessor;
-    }
-
-    public void cancelJob(String jobId) throws ContextException {
-        _todo.dequeue(new Job(0, jobId, false, null));
-        try {
-            _db.deleteJob(jobId, _nodeId);
-        } catch (DatabaseException e) {
-            __log.debug("Job removal failed.", e);
-            throw new ContextException("Job removal failed.", e);
-        }
-    }
-
-    public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
-        try {
-            if (__log.isDebugEnabled()) __log.debug("Beginning a new transaction");
-            _txm.begin();
-        } catch (Exception ex) {
-            String errmsg = "Internal Error, could not begin transaction.";
-            throw new ContextException(errmsg, ex);
-        }
-        boolean success = false;
-        try {
-            T retval = transaction.call();
-            success = true;
-            return retval;
-        } catch (Exception ex) {
-            throw ex;
-        } finally {
-            if (success) {
-                if (__log.isDebugEnabled()) __log.debug("Commiting...");
-                _txm.commit();
-            } else {
-                if (__log.isDebugEnabled()) __log.debug("Rollbacking...");
-                _txm.rollback();
-            }
-        }
-    }
-
-    public String schedulePersistedJob(final JobDetails jobDetail, Date when) throws ContextException {
-        long ctime = System.currentTimeMillis();
-        if (when == null)
-            when = new Date(ctime);
-
-        if (__log.isDebugEnabled())
-            __log.debug("scheduling " + jobDetail + " for " + when);
-
-        return schedulePersistedJob(new Job(when.getTime(), true, jobDetail), when, ctime);
-    }
-
-    public String scheduleMapSerializableRunnable(MapSerializableRunnable runnable, Date when) throws ContextException {
-        long ctime = System.currentTimeMillis();
-        if (when == null)
-            when = new Date(ctime);
-
-        JobDetails jobDetails = new JobDetails();
-        jobDetails.getDetailsExt().put("runnable", runnable);
-        runnable.storeToDetails(jobDetails);
-        
-        if (__log.isDebugEnabled())
-            __log.debug("scheduling " + jobDetails + " for " + when);
-
-        return schedulePersistedJob(new Job(when.getTime(), true, jobDetails), when, ctime);
-    }
-
-    public String schedulePersistedJob(Job job, Date when, long ctime) throws ContextException {
-        boolean immediate = when.getTime() <= ctime + _immediateInterval;
-        boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval;
-        try {
-            if (immediate) {
-                // If we have too many jobs in the queue, we don't allow any new ones
-                if (_todo.size() > _todoLimit) {
-                    __log.error("The execution queue is backed up, the engine can't keep up with the load. Either " +
-                            "increase the queue size or regulate the flow.");
-                    return null;
-                }
-
-                // Immediate scheduling means we put it in the DB for safe keeping
-                _db.insertJob(job, _nodeId, true);
-                // And add it to our todo list .
-                addTodoOnCommit(job);
-
-                __log.debug("scheduled immediate job: " + job.jobId);
-            } else if (nearfuture) {
-                // Near future, assign the job to ourselves (why? -- this makes it very unlikely that we
-                // would get two nodes trying to process the same instance, which causes unsightly rollbacks).
-                _db.insertJob(job, _nodeId, false);
-                __log.debug("scheduled near-future job: " + job.jobId);
-            } else /* far future */{
-                // Not the near future, we don't assign a node-id, we'll assign it later.
-                _db.insertJob(job, null, false);
-                __log.debug("scheduled far-future job: " + job.jobId);
-            }
-        } catch (DatabaseException dbe) {
-            __log.error("Database error.", dbe);
-            throw new ContextException("Database error.", dbe);
-        }
-        return job.jobId;
-
-    }
-
-    public String scheduleVolatileJob(boolean transacted, JobDetails jobDetail) throws ContextException {
-        Job job = new Job(System.currentTimeMillis(), transacted, jobDetail);
-        job.persisted = false;
-        addTodoOnCommit(job);
-        return job.toString();
-    }
-
-    public void setJobProcessor(JobProcessor processor) throws ContextException {
-        _jobProcessor = processor;
-    }
-
-    public void shutdown() {
-        stop();
-        _jobProcessor = null;
-        _txm = null;
-        _todo = null;
-    }
-
-    public synchronized void start() {
-        if (_running)
-            return;
-
-        _todo.clearTasks(UpgradeJobsTask.class);
-        _todo.clearTasks(LoadImmediateTask.class);
-        _todo.clearTasks(CheckStaleNodes.class);
-
-        _knownNodes.clear();
-
-        try {
-            execTransaction(new Callable<Void>() {
-
-                public Void call() throws Exception {
-                    _knownNodes.addAll(_db.getNodeIds());
-                    return null;
-                }
-
-            });
-        } catch (Exception ex) {
-            __log.error("Error retrieving node list.", ex);
-            throw new ContextException("Error retrieving node list.", ex);
-        }
-
-        // Pretend we got a heartbeat...
-        for (String s : _knownNodes)
-            _lastHeartBeat.put(s, System.currentTimeMillis());
-
-        // schedule immediate job loading for now!
-        _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));
-
-        // schedule check for stale nodes, make it random so that the nodes don't overlap.
-        _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long) (_random.nextDouble() * _staleInterval)));
-
-        // do the upgrade sometime (random) in the immediate interval.
-        _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long) (_random.nextDouble() * _immediateInterval)));
-
-        _todo.start();
-        _running = true;
-    }
-
-    public synchronized void stop() {
-        if (!_running)
-            return;
-
-        _todo.stop();
-        _todo.clearTasks(UpgradeJobsTask.class);
-        _todo.clearTasks(LoadImmediateTask.class);
-        _todo.clearTasks(CheckStaleNodes.class);
-        _running = false;
-    }
-
-    public void jobCompleted(String jobId) {
-        boolean deleted = false;
-        try {
-            deleted = _db.deleteJob(jobId, _nodeId);
-        } catch (DatabaseException de) {
-            String errmsg = "Database error.";
-            __log.error(errmsg, de);
-            throw new ContextException(errmsg, de);
-        }
 
+  private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
+  /**
+   * Jobs scheduled with a time that is between [now, now+immediateInterval] will be assigned to the current node, and placed
+   * directly on the todo queue.
+   */
+  long _immediateInterval = 30000;
+  /**
+   * Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
+   * node, but will not be placed on the todo queue (the promoter will pick them up).
+   */
+  long _nearFutureInterval = 10 * 60 * 1000;
+  /** 10s of no communication and you are deemed dead. */
+  long _staleInterval = 10000;
+  String _nodeId;
+  /** Maximum number of jobs in the "near future" / todo queue. */
+  int _todoLimit = 10000;
+  /** The object that actually handles the jobs. */
+  volatile JobProcessor _jobProcessor;
+  volatile JobProcessor _polledRunnableProcessor;
+  private SchedulerThread _todo;
+  private SchedulerDAOConnectionFactory _dbcf;
+  private TransactionManager _txm;
+  /** All the nodes we know about */
+  private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
+  /** When we last heard from our nodes. */
+  private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
+  private boolean _running;
+  /** Time for next upgrade. */
+  private AtomicLong _nextUpgrade = new AtomicLong();
+  /** Time for next job load */
+  private AtomicLong _nextScheduleImmediate = new AtomicLong();
+  private Random _random = new Random();
+
+  public SimpleScheduler(String nodeId, SchedulerDAOConnectionFactory dbcf, TransactionManager mgr, Properties conf) {
+    _nodeId = nodeId;
+    _dbcf = dbcf;
+    _txm = mgr;
+    _todoLimit = Integer.parseInt(conf.getProperty("ode.scheduler.queueLength", "10000"));
+    _todo = new SchedulerThread(this);
+  }
+
+  public void setNodeId(String nodeId) {
+    _nodeId = nodeId;
+  }
+
+  public void setStaleInterval(long staleInterval) {
+    _staleInterval = staleInterval;
+  }
+
+  public void setImmediateInterval(long immediateInterval) {
+    _immediateInterval = immediateInterval;
+  }
+
+  public void setNearFutureInterval(long nearFutureInterval) {
+    _nearFutureInterval = nearFutureInterval;
+  }
+
+  public void setSchedulerDAOConnectionFactory(SchedulerDAOConnectionFactory dbcf) {
+    _dbcf = dbcf;
+  }
+
+  public void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor) {
+    _polledRunnableProcessor = polledRunnableProcessor;
+  }
+
+    public void cancelJob(final String jobId) throws ContextException {
+        SchedulerDAOConnection conn = _dbcf.getConnection();
+        _todo.dequeue(new JobDAOTask(jobId));
+        if (!conn.deleteJob(jobId, _nodeId)) {
+            __log.debug("Job removal failed.");
+            throw new ContextException("Job removal failed.");
+        }
+    }
+
+  public String schedulePersistedJob(final JobDetails jobDetail, Date when) throws ContextException {
+    long ctime = System.currentTimeMillis();
+    if (when == null) {
+      when = new Date(ctime);
+    }
+
+    if (__log.isDebugEnabled()) {
+      __log.debug("scheduling " + jobDetail + " for " + when);
+    }
+
+    return schedulePersistedJob(jobDetail, true, when, ctime);
+  }
+
+  public String scheduleMapSerializableRunnable(MapSerializableRunnable runnable, Date when) throws ContextException {
+    long ctime = System.currentTimeMillis();
+    if (when == null) {
+      when = new Date(ctime);
+    }
+
+    JobDetails jobDetails = new JobDetails();
+    jobDetails.getDetailsExt().put("runnable", runnable);
+    runnable.storeToDetails(jobDetails);
+
+    if (__log.isDebugEnabled()) {
+      __log.debug("scheduling " + jobDetails + " for " + when);
+    }
+
+    return schedulePersistedJob(jobDetails, true, when, ctime);
+  }
+
+    private JobDAO insertJob(final boolean transacted, final JobDetails jobDetails, final long scheduledDate, final String nodeID, final boolean loaded, final boolean enqueue) throws ContextException {
+        SchedulerDAOConnection conn = _dbcf.getConnection();
+        final JobDAO job = conn.createJob(transacted, jobDetails, true, scheduledDate);
+        if (!conn.insertJob(job, nodeID, loaded)) {
+            String msg = String.format("Database insert failed. jobId %s nodeId %s", job.getJobId(), nodeID);
+            __log.error(msg);
+            throw new ContextException(msg);
+        }
+        if (enqueue) {
+            addTodoOnCommit(new JobDAOTask(job));
+        }
+        return job;
+    }
+
+  public String schedulePersistedJob(JobDetails jobDetails, boolean transacted, Date when, long ctime) throws ContextException {
+    boolean immediate = when.getTime() <= ctime + _immediateInterval;
+    boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval;
+    JobDAO job;
+
+    if (immediate) {
+      // If we have too many jobs in the queue, we don't allow any new ones
+      if (_todo.size() > _todoLimit) {
+        __log.error("The execution queue is backed up, the engine can't keep up with the load. Either "
+                + "increase the queue size or regulate the flow.");
+        return null;
+      }
+      job = insertJob(transacted, jobDetails, when.getTime(), _nodeId, true, true);
+      __log.debug("scheduled immediate job: " + job.getJobId());
+    } else if (nearfuture) {
+      // Near future, assign the job to ourselves (why? -- this makes it very unlikely that we
+      // would get two nodes trying to process the same instance, which causes unsightly rollbacks).
+      job = insertJob(transacted, jobDetails, when.getTime(), _nodeId, false, false);
+      __log.debug("scheduled near-future job: " + job.getJobId());
+    } else /* far future */ {
+      // Not the near future, we don't assign a node-id, we'll assign it later.
+      job = insertJob(transacted, jobDetails, when.getTime(), null, false, false);
+      __log.debug("scheduled far-future job: " + job.getJobId());
+    }
+    return job.getJobId();
+  }
+
+    public String scheduleVolatileJob(final boolean transacted, final JobDetails jobDetail) throws ContextException {
+        SchedulerDAOConnection conn = _dbcf.getConnection();
+        final JobDAO job = conn.createJob(transacted, jobDetail, false, System.currentTimeMillis());
+        addTodoOnCommit(new JobDAOTask(job));
+        return job.getJobId();
+    }
+
+  public void setJobProcessor(JobProcessor processor) throws ContextException {
+    _jobProcessor = processor;
+  }
+
+  public void shutdown() {
+    stop();
+    _jobProcessor = null;
+    _todo = null;
+  }
+
+  public synchronized void start() {
+    if (_running) {
+      return;
+    }
+
+    _todo.clearTasks(UpgradeJobsTask.class);
+    _todo.clearTasks(LoadImmediateTask.class);
+    _todo.clearTasks(CheckStaleNodes.class);
+
+    _knownNodes.clear();
+
+      exec(new Callable<Boolean>() {
+
+          public Boolean call(SchedulerDAOConnection conn) throws ContextException {
+              List<String> ids = conn.getNodeIds();
+              if (ids == null) {
+                  __log.error("Error retrieving node list.");
+                  throw new ContextException("Error retrieving node list.");
+              }
+              _knownNodes.addAll(ids);
+              return null;
+          }
+      });
+
+      // Pretend we got a heartbeat...
+      for (String s : _knownNodes) {
+      _lastHeartBeat.put(s, System.currentTimeMillis());
+    }
+
+    // schedule immediate job loading for now!
+    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));
+
+    // schedule check for stale nodes, make it random so that the nodes don't overlap.
+    _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long) (_random.nextDouble() * _staleInterval)));
+
+    // do the upgrade sometime (random) in the immediate interval.
+    _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long) (_random.nextDouble() * _immediateInterval)));
+
+    _todo.start();
+    _running = true;
+  }
+
+  public synchronized void stop() {
+    if (!_running) {
+      return;
+    }
+
+    _todo.stop();
+    _todo.clearTasks(UpgradeJobsTask.class);
+    _todo.clearTasks(LoadImmediateTask.class);
+    _todo.clearTasks(CheckStaleNodes.class);
+    _running = false;
+  }
+
+    public void jobCompleted(final String jobId) {
+        SchedulerDAOConnection conn = _dbcf.getConnection();
+        boolean deleted = conn.deleteJob(jobId, _nodeId);
         if (!deleted) {
-            try {
-                _txm.getTransaction().setRollbackOnly();
-            } catch (Exception ex) {
-                __log.error("Transaction manager error; setRollbackOnly() failed.", ex);
-            }
-
             throw new ContextException("Job no longer in database: jobId=" + jobId);
         }
     }
 
-
-    /**
-     * Run a job in the current thread.
-     *
-     * @param job
-     *            job to run.
-     */
-    protected void runJob(final Job job) {
-        final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail, job.detail.getRetryCount());
-
-        try {
-            try {
-                _jobProcessor.onScheduledJob(jobInfo);
-            } catch (JobProcessorException jpe) {
-                if (jpe.retry)
-                    __log.error("Error while processing transaction, retrying in " + doRetry(job) + "s");
-                else
-                    __log.error("Error while processing transaction, no retry.", jpe);
-            }
-        } catch (Exception ex) {
-            __log.error("Error in scheduler processor.", ex);
-        }
-
+  /**
+   * Run a job in the current thread.
+   *
+   * @param job
+   *            job to run.
+   */
+  protected void runJob(final JobDAO job) {
+    final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.getJobId(), job.getDetails(), job.getDetails().getRetryCount());
+
+    try {
+      try {
+        _jobProcessor.onScheduledJob(jobInfo);
+      } catch (JobProcessorException jpe) {
+        if (jpe.retry) {
+          __log.error("Error while processing transaction, retrying in " + doRetry(job) + "s");
+        } else {
+          __log.error("Error while processing transaction, no retry.", jpe);
+        }
+      }
+    } catch (Exception ex) {
+      __log.error("Error in scheduler processor.", ex);
     }
 
-    private void addTodoOnCommit(final Job job) {
+  }
+
+    private void addTodoOnCommit(final JobDAOTask job) {
 
         Transaction tx;
         try {
@@ -365,72 +314,75 @@ public class SimpleScheduler implements 
             throw new ContextException(errmsg, ex);
         }
 
-        if (tx == null)
-            throw new ContextException("Missing required transaction in thread " + Thread.currentThread());
-
-        try {
-            tx.registerSynchronization(new Synchronization() {
+        if (tx == null) {
+            _todo.enqueue(job);
+        } else {
+            try {
+                tx.registerSynchronization(new Synchronization() {
 
-                public void afterCompletion(int status) {
-                    if (status == Status.STATUS_COMMITTED) {
-                        _todo.enqueue(job);
+                    public void afterCompletion(int status) {
+                        if (status == Status.STATUS_COMMITTED) {
+                            _todo.enqueue(job);
+                        }
                     }
-                }
-
-                public void beforeCompletion() {
-                }
 
-            });
-
-        } catch (Exception e) {
-            String errmsg = "Unable to registrer synchronizer. ";
-            __log.error(errmsg, e);
-            throw new ContextException(errmsg, e);
+                    public void beforeCompletion() {
+                    }
+                });
+            } catch (Exception e) {
+                String errmsg = "Unable to registrer synchronizer. ";
+                __log.error(errmsg, e);
+                throw new ContextException(errmsg, e);
+            }
         }
     }
 
-    public void runTask(Task task) {
-        if (task instanceof Job)
-            runJob((Job) task);
-        if (task instanceof SchedulerTask)
-            ((SchedulerTask) task).run();
+  public void runTask(Task task) {
+    if (task instanceof JobDAOTask) {
+      runJob(((JobDAOTask) task).dao);
     }
+    if (task instanceof SchedulerTask) {
+      ((SchedulerTask) task).run();
+    }
+  }
 
-    public void updateHeartBeat(String nodeId) {
-        if (nodeId == null)
-            return;
-
-        if (_nodeId.equals(nodeId))
-            return;
+  public void updateHeartBeat(String nodeId) {
+    if (nodeId == null) {
+      return;
+    }
 
-        _lastHeartBeat.put(nodeId, System.currentTimeMillis());
-        _knownNodes.add(nodeId);
+    if (_nodeId.equals(nodeId)) {
+      return;
     }
 
+    _lastHeartBeat.put(nodeId, System.currentTimeMillis());
+    _knownNodes.add(nodeId);
+  }
+
     boolean doLoadImmediate() {
         __log.debug("LOAD IMMEDIATE started");
-        List<Job> jobs;
-        try {
-            do {
-                jobs = execTransaction(new Callable<List<Job>>() {
-                    public List<Job> call() throws Exception {
-                        return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, 10);
-                    }
-                });
-                for (Job j : jobs) {
-                    if (__log.isDebugEnabled())
-                        __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
 
-                    _todo.enqueue(j);
+        List<JobDAO> jobs;
+        do {
+            jobs = exec(new Callable<List<JobDAO>>() {
+                public List<JobDAO> call(SchedulerDAOConnection conn) throws ContextException {
+                    return conn.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, 10);
                 }
-            } while (jobs.size() == 10);
-            return true;
-        } catch (Exception ex) {
-            __log.error("Error loading immediate jobs from database.", ex);
-            return false;
-        } finally {
-            __log.debug("LOAD IMMEDIATE complete");
-        }
+            });
+            if (jobs == null) {
+                __log.error("Error loading immediate jobs from database.");
+                return false;
+            }
+            for (JobDAO j : jobs) {
+                if (__log.isDebugEnabled()) {
+                    __log.debug("todo.enqueue job from db: " + j.getJobId() + " for " + j.getScheduledDate());
+                }
+                _todo.enqueue(new JobDAOTask(j));
+            }
+
+        } while (jobs.size() == 10);
+        __log.debug("LOAD IMMEDIATE complete");
+        return true;
     }
 
     boolean doUpgrade() {
@@ -446,154 +398,214 @@ public class SimpleScheduler implements 
         // of the time by the number of nodes to create the node assignment.
         // This can be done in a single update statement.
         final long maxtime = System.currentTimeMillis() + _nearFutureInterval;
-        try {
-            return execTransaction(new Callable<Boolean>() {
-
-                public Boolean call() throws Exception {
-                    int numNodes = knownNodes.size();
-                    for (int i = 0; i < numNodes; ++i) {
-                        String node = knownNodes.get(i);
-                        _db.updateAssignToNode(node, i, numNodes, maxtime);
+        return exec(new Callable<Boolean>() {
+            public Boolean call(SchedulerDAOConnection conn) throws ContextException {
+                int numNodes = knownNodes.size();
+                if (numNodes == -1) {
+                    __log.error("Database error upgrading jobs.");
+                    return false;
+                }
+                for (int i = 0; i < numNodes; ++i) {
+                    String node = knownNodes.get(i);
+                    if (conn.updateAssignToNode(node, i, numNodes, maxtime) == -1) {
+                        __log.error("updateAssignToNode failed");
+                        return false;
                     }
-                    return true;
                 }
-
-            });
-
-        } catch (Exception ex) {
-            __log.error("Database error upgrading jobs.", ex);
-            return false;
-        } finally {
-            __log.debug("UPGRADE complete");
-        }
-
+                __log.debug("UPGRADE complete");
+                return true;
+            }
+        });
     }
 
-    /**
-     * Re-assign stale node's jobs to self.
-     *
-     * @param nodeId
-     */
+  /**
+   * Re-assign stale node's jobs to self.
+   *
+   * @param nodeId
+   */
     void recoverStaleNode(final String nodeId) {
         __log.debug("recovering stale node " + nodeId);
-        try {
-            int numrows = execTransaction(new Callable<Integer>() {
+        int numrows = exec(new Callable<Integer>() {
 
-                public Integer call() throws Exception {
-                    return _db.updateReassign(nodeId, _nodeId);
+            public Integer call(SchedulerDAOConnection conn) throws ContextException {
+                int numrows = conn.updateReassign(nodeId, _nodeId);
+                if (numrows == -1) {
+                    __log.error("Database error reassigning node.");
+                    throw new ContextException("Database error reassigning node.");
                 }
+                __log.debug("reassigned " + numrows + " jobs to self. ");
+                return numrows;
 
-            });
+            }
+        });
 
-            __log.debug("reassigned " + numrows + " jobs to self. ");
+        // We can now forget about this node, if we see it again, it will be
+        // "new to us"
+        _knownNodes.remove(nodeId);
+        _lastHeartBeat.remove(nodeId);
 
-            // We can now forget about this node, if we see it again, it will be
-            // "new to us"
-            _knownNodes.remove(nodeId);
-            _lastHeartBeat.remove(nodeId);
+        // Force a load-immediate to catch anything new from the recovered node.
+        doLoadImmediate();
+        __log.debug("node recovery complete");
+    }
 
-            // Force a load-immediate to catch anything new from the recovered node.
-            doLoadImmediate();
+    private long doRetry(final JobDAO job) {
+        final int retry = job.getDetails().getRetryCount() + 1;
+        final long delay = (long) (Math.pow(5, retry - 1));
 
-        } catch (Exception ex) {
-            __log.error("Database error reassigning node.", ex);
-        } finally {
-            __log.debug("node recovery complete");
-        }
-    }
+        /*even though the JobDetails property is marked with Transient Hibernate
+        still treats any changes to it as a modification and will attempt to persist it.
+        for the new Job we will clone the existing details.*/
 
-    private long doRetry(Job job) throws DatabaseException {
-        int retry = job.detail.getRetryCount() + 1;
-        job.detail.setRetryCount(retry);
-        long delay = (long)(Math.pow(5, retry - 1));
-        Job jobRetry = new Job(System.currentTimeMillis() + delay*1000, true, job.detail);
-        _db.insertJob(jobRetry, _nodeId, false);
+        Scheduler.JobDetails newJobDetails;
+        try {
+            newJobDetails = job.getDetails().clone();
+        } catch (CloneNotSupportedException ce) {
+            __log.error("Unable to clone job details", ce);
+            return -1L;
+        }
+        newJobDetails.setRetryCount(newJobDetails.getRetryCount() + 1);
+        SchedulerDAOConnection conn = _dbcf.getConnection();
+        JobDAO jobRetry = conn.createJob(job.isTransacted(), newJobDetails, job.isPersisted(), System.currentTimeMillis() + delay * 1000);
+        if (!conn.insertJob(jobRetry, _nodeId, false)) {
+            __log.debug("retry failed for Job " + jobRetry.getJobId());
+            return -1L;
+        }
+        __log.debug("added retry for Job " + job.getJobId() + " at " + job.getScheduledDate() + " new Job: " + jobRetry.getJobId() + " at " + jobRetry.getScheduledDate());
         return delay;
     }
 
-    private abstract class SchedulerTask extends Task implements Runnable {
-        SchedulerTask(long schedDate) {
-            super(schedDate);
-        }
+  <T> T exec(Callable<T> callable) throws ContextException {
+        //Execute the work on the current thread
+      return callable.call();
     }
 
-    private class LoadImmediateTask extends SchedulerTask {
-
-        LoadImmediateTask(long schedDate) {
-            super(schedDate);
-        }
+    /**
+     * Wrapper for database transactions.
+     *
+     * @author Maciej Szefler
+     *
+     * @param <V>
+     *            return type
+     */
+    abstract class Callable<V> implements java.util.concurrent.Callable<V> {
 
-        public void run() {
+        public V call() throws ContextException {
             boolean success = false;
+            SchedulerDAOConnection conn = _dbcf.getConnection();
             try {
-                success = doLoadImmediate();
+                if (_txm != null) {
+                    _txm.begin();
+                }
+                V r = call(conn);
+                if (_txm != null) {
+                    _txm.commit();
+                }
+                success = true;
+                return r;
+            } catch (ContextException ce) {
+                throw ce;
+            } catch (Exception e) {
+                throw new ContextException("TxError", e);
             } finally {
-                if (success)
-                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .75)));
-                else
-                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 100));
+
+                try {
+                    if (!success && _txm != null ) {
+                        _txm.rollback();
+                    }
+                } catch (Exception ex) {
+                    __log.error("TxError", ex);
+                }
+                conn.close();
             }
+
         }
 
+        abstract V call(SchedulerDAOConnection conn) throws ContextException;
     }
 
-    /**
-     * Upgrade jobs from far future to immediate future (basically, assign them to a node).
-     *
-     * @author mszefler
-     *
-     */
-    private class UpgradeJobsTask extends SchedulerTask {
+  private abstract class SchedulerTask extends Task implements Runnable {
 
-        UpgradeJobsTask(long schedDate) {
-            super(schedDate);
-        }
+    SchedulerTask(long schedDate) {
+      super(schedDate);
+    }
+  }
 
-        public void run() {
-            long ctime = System.currentTimeMillis();
-            long ntime = _nextUpgrade.get();
-            __log.debug("UPGRADE task for " + schedDate + " fired at " + ctime);
-
-            // We could be too early, this can happen if upgrade gets delayed due to another
-            // node
-            if (_nextUpgrade.get() > System.currentTimeMillis()) {
-                __log.debug("UPGRADE skipped -- wait another " + (ntime - ctime) + "ms");
-                _todo.enqueue(new UpgradeJobsTask(ntime));
-                return;
-            }
+  private class LoadImmediateTask extends SchedulerTask {
 
-            boolean success = false;
-            try {
-                success = doUpgrade();
-            } finally {
-                long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100);
-                _nextUpgrade.set(future);
-                _todo.enqueue(new UpgradeJobsTask(future));
-                __log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms");
-            }
+    LoadImmediateTask(long schedDate) {
+      super(schedDate);
+    }
+
+    public void run() {
+      boolean success = false;
+      try {
+        success = doLoadImmediate();
+      } finally {
+        if (success) {
+          _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .75)));
+        } else {
+          _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 100));
         }
+      }
+    }
+  }
+
+  /**
+   * Upgrade jobs from far future to immediate future (basically, assign them to a node).
+   *
+   * @author mszefler
+   *
+   */
+  private class UpgradeJobsTask extends SchedulerTask {
 
+    UpgradeJobsTask(long schedDate) {
+      super(schedDate);
     }
 
-    /**
-     * Check if any of the nodes in our cluster are stale.
-     */
-    private class CheckStaleNodes extends SchedulerTask {
+    public void run() {
+      long ctime = System.currentTimeMillis();
+      long ntime = _nextUpgrade.get();
+      __log.debug("UPGRADE task for " + schedDate + " fired at " + ctime);
 
-        CheckStaleNodes(long schedDate) {
-            super(schedDate);
-        }
+      // We could be too early, this can happen if upgrade gets delayed due to another
+      // node
+      if (_nextUpgrade.get() > System.currentTimeMillis()) {
+        __log.debug("UPGRADE skipped -- wait another " + (ntime - ctime) + "ms");
+        _todo.enqueue(new UpgradeJobsTask(ntime));
+        return;
+      }
 
-        public void run() {
-            _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
-            __log.debug("CHECK STALE NODES started");
-            for (String nodeId : _knownNodes) {
-                Long lastSeen = _lastHeartBeat.get(nodeId);
-                if (lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
-                    recoverStaleNode(nodeId);
-            }
-        }
+      boolean success = false;
+      try {
+        success = doUpgrade();
+      } finally {
+        long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100);
+        _nextUpgrade.set(future);
+        _todo.enqueue(new UpgradeJobsTask(future));
+        __log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms");
+      }
+    }
+  }
 
+  /**
+   * Check if any of the nodes in our cluster are stale.
+   */
+  private class CheckStaleNodes extends SchedulerTask {
+
+    CheckStaleNodes(long schedDate) {
+      super(schedDate);
     }
 
+    public void run() {
+      _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
+      __log.debug("CHECK STALE NODES started");
+      for (String nodeId : _knownNodes) {
+        Long lastSeen = _lastHeartBeat.get(nodeId);
+        if (lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval) {
+          recoverStaleNode(nodeId);
+        }
+      }
+    }
+  }
 }

Added: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskComparatorByDate.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskComparatorByDate.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskComparatorByDate.java (added)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskComparatorByDate.java Sun May  2 17:02:51 2010
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple;
+
+import java.util.Comparator;
+
+/**
+ * Compare jobs, using scheduled date as sort criteria.
+ * 
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+class TaskComparatorByDate implements Comparator<Task> {
+
+    public int compare(Task o1, Task o2) {
+        long diff = o1.schedDate - o2.schedDate;
+        if (diff < 0) return -1;
+        if (diff > 0) return 1;
+        return 0;
+    }
+
+}

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java Sun May  2 17:02:51 2010
@@ -19,6 +19,7 @@
 
 package org.apache.ode.scheduler.simple;
 
+
 /**
  * The thing that runs the scheduled tasks.
  * 
@@ -27,6 +28,5 @@ package org.apache.ode.scheduler.simple;
  */
 interface TaskRunner {
 
-    
     public void runTask(Task task);
 }

Added: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java (added)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java Sun May  2 17:02:51 2010
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple.jdbc;
+
+
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.dao.scheduler.JobDAO;
+import org.apache.ode.utils.GUID;
+
+/**
+ * Like a task, but a little bit better.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+class JobDAOImpl implements JobDAO {
+
+    long scheduledDate;
+    String jobId;
+    boolean transacted;
+    JobDetails detail;
+    boolean persisted = true;
+
+    public JobDAOImpl(long when, String jobId, boolean transacted, JobDetails jobDetail) {
+        this.scheduledDate = when;
+        this.jobId = jobId;
+        this.detail = jobDetail;
+        this.transacted = transacted;
+    }
+
+  public String getJobId() {
+    return jobId;
+  }
+
+  public boolean isTransacted() {
+    return transacted;
+  }
+
+  public JobDetails getDetails() {
+    return detail;
+  }
+
+  public boolean isPersisted() {
+    return persisted;
+  }
+
+  public long getScheduledDate() {
+    return scheduledDate;
+  }
+
+}

Added: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java (added)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java Sun May  2 17:02:51 2010
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.scheduler.simple.jdbc;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
+
+
+public class SchedulerDAOConnectionFactoryImpl implements SchedulerDAOConnectionFactory {
+  static ThreadLocal<SchedulerDAOConnection> _connections = new ThreadLocal<SchedulerDAOConnection>();
+  DataSource _ds;
+  TransactionManager _txm;
+  AtomicBoolean _active = new AtomicBoolean(true);
+
+  public void init(Properties odeConfig, TransactionManager mgr, Object env) {
+    _ds = (DataSource) env;
+    _txm = mgr;
+  }
+
+  public SchedulerDAOConnection getConnection() {
+    if (_connections.get()==null || _connections.get().isClosed() ){
+      _connections.set(new SchedulerDAOConnectionImpl(_active,_ds,_txm));
+    }
+    return _connections.get();
+    
+  }
+
+  public void shutdown() {
+    _active.set(false);
+  }
+}

Added: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java (added)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java Sun May  2 17:02:51 2010
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple.jdbc;
+
+import org.apache.ode.dao.scheduler.JobDAO;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.sql.DataSource;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.utils.DbIsolation;                                                                                                                                 
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.utils.GUID;
+import org.apache.ode.utils.StreamUtils;
+
+/**
+ * JDBC-based implementation of the {@link DatabaseDelegate} interface. Should work with most 
+ * reasonably behaved databases. 
+ * 
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+public class SchedulerDAOConnectionImpl implements SchedulerDAOConnection {
+
+    private static final Log __log = LogFactory.getLog(SchedulerDAOConnectionImpl.class);
+
+    private static final String DELETE_JOB = "delete from ODE_JOB where jobid = ? and nodeid = ?";
+
+    private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid = ?, scheduled = 0 where nodeid = ?";
+
+    private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0 "
+            + "and mod(ts,?) = ? and ts < ?";
+
+    private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0 "
+        + "and (ts % ?) = ? and ts < ?";
+
+    private static final String SAVE_JOB = "insert into ODE_JOB "
+            + " (jobid, nodeid, ts, scheduled, transacted, "
+            + "instanceId,"
+            + "mexId,"
+            + "processId,"
+            + "type,"
+            + "channel,"
+            + "correlatorId,"
+            + "correlationKey,"
+            + "retryCount,"
+            + "inMem,"
+            + "detailsExt"
+            + ") values(?, ?, ?, ?, ?,"
+            + "?,"
+            + "?,"
+            + "?,"
+            + "?,"
+            + "?,"
+            + "?,"
+            + "?,"
+            + "?,"
+            + "?,"
+            + "?"
+            + ")";
+
+    private static final String GET_NODEIDS = "select distinct nodeid from ODE_JOB";
+
+    private static final String SCHEDULE_IMMEDIATE = "select jobid, ts, transacted, scheduled, "
+        + "instanceId,"
+        + "mexId,"
+        + "processId,"
+        + "type,"
+        + "channel,"
+        + "correlatorId,"
+        + "correlationKey,"
+        + "retryCount,"
+        + "inMem,"
+        + "detailsExt"
+        + " from ODE_JOB "
+            + "where nodeid = ? and scheduled = 0 and ts < ? order by ts";
+
+//  public Long instanceId;
+//  public String mexId;
+//  public String processId;
+//  public String type;
+//  public String channel;
+//  public String correlatorId;
+//  public String correlationKey;
+//  public Integer retryCount;
+//  public Boolean inMem;
+//  public Map<String, Object> detailsExt = new HashMap<String, Object>();    
+    
+    private static final String UPDATE_SCHEDULED = "update ODE_JOB set scheduled = 1 where jobid in (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+    private static final int UPDATE_SCHEDULED_SLOTS = 10;
+
+    private DataSource _ds;
+
+    private TransactionManager _txm;
+
+    private AtomicBoolean _active;
+
+    private List<Runnable> _onCommits = new ArrayList<Runnable>();
+
+    private Dialect _dialect;
+    
+    public SchedulerDAOConnectionImpl(AtomicBoolean active, DataSource ds, TransactionManager txm) {
+        _active = active;
+        _ds = ds;
+        _txm = txm;
+        _dialect = guessDialect();
+    }
+
+    public boolean deleteJob(String jobid, String nodeId) {
+        if (__log.isDebugEnabled())
+            __log.debug("deleteJob " + jobid + " on node " + nodeId);
+
+        Connection con = null;
+        PreparedStatement ps = null;
+        try {
+            con = getConnection();
+            ps = con.prepareStatement(DELETE_JOB);
+            ps.setString(1, jobid);
+            ps.setString(2, nodeId);
+            return ps.executeUpdate() == 1;
+        } catch (SQLException se) {
+            __log.error("Error deleting job "+ jobid,se);
+            return false;
+        } finally {
+            close(ps);
+            close(con);
+        }
+    }
+
+    public List<String> getNodeIds() {
+        Connection con = null;
+        PreparedStatement ps = null;
+        try {
+            con = getConnection();
+            ps = con.prepareStatement(GET_NODEIDS, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+            ResultSet rs = ps.executeQuery();
+            ArrayList<String> nodes = new ArrayList<String>();
+            while (rs.next()) {
+                String nodeId = rs.getString(1);
+                if (nodeId != null)
+                    nodes.add(rs.getString(1));
+            }
+            if (__log.isDebugEnabled())
+                __log.debug("getNodeIds: " + nodes);
+            return nodes;
+        } catch (SQLException se) {
+            __log.error("Error getting Node IDs ",se);
+            return null;
+        } finally {
+            close(ps);
+            close(con);
+        }
+    }
+
+    public boolean insertJob(JobDAO job, String nodeId, boolean loaded) {
+        if (__log.isDebugEnabled())
+            __log.debug("insertJob " + job.getJobId() + " on node " + nodeId + " loaded=" + loaded);
+
+        Connection con = null;
+        PreparedStatement ps = null;
+        try {
+            int i = 1;
+            con = getConnection();
+            ps = con.prepareStatement(SAVE_JOB);
+            ps.setString(i++, job.getJobId());
+            ps.setString(i++, nodeId);
+            ps.setLong(i++, job.getScheduledDate());
+            ps.setInt(i++, asInteger(loaded));
+            ps.setInt(i++, asInteger(job.isTransacted()));
+            
+            JobDetails details = job.getDetails();
+            ps.setObject(i++, details.instanceId, Types.BIGINT);
+            ps.setObject(i++, details.mexId, Types.VARCHAR);
+            ps.setObject(i++, details.processId, Types.VARCHAR);
+            ps.setObject(i++, details.type, Types.VARCHAR);
+            ps.setObject(i++, details.channel, Types.VARCHAR);
+            ps.setObject(i++, details.correlatorId, Types.VARCHAR);
+            ps.setObject(i++, details.correlationKey, Types.VARCHAR);
+            ps.setObject(i++, details.retryCount, Types.INTEGER);
+            ps.setObject(i++, details.inMem, Types.INTEGER);
+            
+            if (details.detailsExt == null || details.detailsExt.size() == 0) {
+                ps.setObject(i++, null, Types.BLOB);
+            } else {
+                ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                try {
+                    StreamUtils.write(bos, (Serializable) details.detailsExt);
+                } catch (Exception ex) {
+                    __log.error("Error serializing job detail: " + job.getDetails());
+                   return false;
+                }
+                ps.setBytes(i++, bos.toByteArray());
+            }
+            
+            return ps.executeUpdate() == 1;
+        } catch (SQLException se) {
+          __log.error("Error inserting Job " + job.getJobId(),se);
+          return false;
+        } finally {
+            close(ps);
+            close(con);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<JobDAO> dequeueImmediate(String nodeId, long maxtime, int maxjobs) {
+        ArrayList<JobDAO> ret = new ArrayList<JobDAO>(maxjobs);
+        Connection con = null;
+        PreparedStatement ps = null;
+        try {
+            con = getConnection();
+            ps = con.prepareStatement(SCHEDULE_IMMEDIATE);
+            ps.setString(1, nodeId);
+            ps.setLong(2, maxtime);
+            ps.setMaxRows(maxjobs);
+            
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                Scheduler.JobDetails details = new Scheduler.JobDetails();
+                details.instanceId = (Long) rs.getObject("instanceId");
+                details.mexId = (String) rs.getObject("mexId");
+                details.processId = (String) rs.getObject("processId");
+                details.type = (String) rs.getObject("type");
+                details.channel = (String) rs.getObject("channel");
+                details.correlatorId = (String) rs.getObject("correlatorId");
+                details.correlationKey = (String) rs.getObject("correlationKey");
+                details.retryCount = (Integer) rs.getObject("retryCount");
+                details.inMem = (Boolean) rs.getObject("inMem");
+                if (rs.getObject("detailsExt") != null) {
+                    try {
+                        ObjectInputStream is = new ObjectInputStream(rs.getBinaryStream("detailsExt"));
+                        details.detailsExt = (Map<String, Object>) is.readObject();
+                        is.close();
+                    } catch (Exception e) {
+                        __log.error("Error in dequeueImmediate ",e);
+                        return null;
+                    }
+                }
+                
+                {
+                    //For compatibility reasons, we check whether there are entries inside
+                    //jobDetailsExt blob, which correspond to extracted entries. If so, we
+                    //use them.
+
+                    Map<String, Object> detailsExt = details.getDetailsExt();
+                    if (detailsExt.get("type") != null) {
+                        details.type = (String) detailsExt.get("type");
+                    }
+                    if (detailsExt.get("iid") != null) {
+                        details.instanceId = (Long) detailsExt.get("iid");
+                    }
+                    if (detailsExt.get("pid") != null) {
+                        details.processId = (String) detailsExt.get("pid");
+                    }
+                    if (detailsExt.get("inmem") != null) {
+                        details.inMem = (Boolean) detailsExt.get("inmem");
+                    }
+                    if (detailsExt.get("ckey") != null) {
+                        details.correlationKey = (String) detailsExt.get("ckey");
+                    }
+                    if (detailsExt.get("channel") != null) {
+                        details.channel = (String) detailsExt.get("channel");
+                    }
+                    if (detailsExt.get("mexid") != null) {
+                        details.mexId = (String) detailsExt.get("mexid");
+                    }
+                    if (detailsExt.get("correlatorId") != null) {
+                        details.correlatorId = (String) detailsExt.get("correlatorId");
+                    }
+                    if (detailsExt.get("retryCount") != null) {
+                        details.retryCount = Integer.parseInt((String) detailsExt.get("retryCount"));
+                    }
+                }
+                
+                JobDAO job = new JobDAOImpl(rs.getLong(2), rs.getString(1), asBoolean(rs.getInt(3)), details);
+                ret.add(job);
+            }
+            rs.close();
+            ps.close();
+            
+            // mark jobs as scheduled, UPDATE_SCHEDULED_SLOTS at a time
+            int j = 0;
+            int updateCount = 0;
+            ps = con.prepareStatement(UPDATE_SCHEDULED);
+            for (int updates = 1; updates <= (ret.size() / UPDATE_SCHEDULED_SLOTS) + 1; updates++) {
+                for (int i = 1; i <= UPDATE_SCHEDULED_SLOTS; i++) {
+                    ps.setString(i, j < ret.size() ? ret.get(j).getJobId() : "");
+                    j++;
+                }
+                ps.execute();
+                updateCount += ps.getUpdateCount();
+            }
+            if (updateCount != ret.size()) {
+              __log.error("Updating scheduled jobs failed to update all jobs; expected=" + ret.size()
+                                + " actual=" + updateCount);
+             return null;
+              
+            }
+        } catch (SQLException se) {
+            __log.error("SQL error in dequeueImmediate",se);
+            return null;
+        } finally {
+            close(ps);
+            close(con);
+        }
+        return ret;
+    }
+
+    public int updateReassign(String oldnode, String newnode) {
+        if (__log.isDebugEnabled())
+            __log.debug("updateReassign from " + oldnode + " ---> " + newnode);
+        Connection con = null;
+        PreparedStatement ps = null;
+        try {
+            con = getConnection();
+            ps = con.prepareStatement(UPDATE_REASSIGN);
+            ps.setString(1, newnode);
+            ps.setString(2, oldnode);
+            return ps.executeUpdate();
+        } catch (SQLException se) {
+            __log.error("Error in updateReassign",se);
+            return -1;
+        } finally {
+            close(ps);
+            close(con);
+        }
+    }
+
+    public int updateAssignToNode(String node, int i, int numNodes, long maxtime) {
+        if (__log.isDebugEnabled())
+            __log.debug("updateAssignToNode node=" + node + " " + i + "/" + numNodes + " maxtime=" + maxtime);
+        Connection con = null;
+        PreparedStatement ps = null;
+        try {
+            con = getConnection();
+            if (_dialect == Dialect.SQLSERVER) {
+                ps = con.prepareStatement(UPGRADE_JOB_SQLSERVER);
+            } else {
+                ps = con.prepareStatement(UPGRADE_JOB_DEFAULT);
+            }
+            ps.setString(1, node);
+            ps.setInt(2, numNodes);
+            ps.setInt(3, i);
+            ps.setLong(4, maxtime);
+            return ps.executeUpdate();
+        } catch (SQLException se) {
+            __log.error("SQL error in updateAssignToNode",se);
+            return -1;
+        } finally {
+            close(ps);
+            close(con);
+        }
+    }
+
+    private Connection getConnection() throws SQLException {                                                                                                             
+        Connection c = _ds.getConnection();                                                                                                                              
+        DbIsolation.setIsolationLevel(c);                                                                                                                                
+        return c;                                                                                                                                                        
+    }                                                                                                                                                                    
+
+    private int asInteger(boolean value) {
+        return (value ? 1 : 0);
+    }
+
+    private boolean asBoolean(int value) {
+        return (value != 0);
+    }
+
+    private void close(PreparedStatement ps) {
+        if (ps != null) {
+            try {
+                ps.close();
+            } catch (Exception e) {
+                __log.warn("Exception while closing prepared statement", e);
+            }
+        }
+    }
+
+    private void close(Connection con) {
+        if (con != null) {
+            try {
+                con.close();
+            } catch (Exception e) {
+                __log.warn("Exception while closing connection", e);
+            }
+        }
+    }
+
+    private Dialect guessDialect() {
+        Dialect d = Dialect.UNKNOWN;
+        Connection con = null;
+        try {
+            con = getConnection();
+            DatabaseMetaData metaData = con.getMetaData();
+            if (metaData != null) {
+                String dbProductName = metaData.getDatabaseProductName();
+                int dbMajorVer = metaData.getDatabaseMajorVersion();
+                __log.debug("Using database " + dbProductName + " major version " + dbMajorVer);
+                if (dbProductName.indexOf("DB2") >= 0) {
+                    d = Dialect.DB2;
+                } else if (dbProductName.indexOf("Derby") >= 0) {
+                    d = Dialect.DERBY;
+                } else if (dbProductName.indexOf("Firebird") >= 0) {
+                    d = Dialect.FIREBIRD;
+                } else if (dbProductName.indexOf("HSQL") >= 0) {
+                    d = Dialect.HSQL;
+                } else if (dbProductName.indexOf("Microsoft SQL") >= 0) {
+                    d = Dialect.SQLSERVER;
+                } else if (dbProductName.indexOf("MySQL") >= 0) {
+                    d = Dialect.MYSQL;
+                } else if (dbProductName.indexOf("Sybase") >= 0) {
+                    d = Dialect.SYBASE;
+                }
+            }
+        } catch (SQLException e) {
+            __log.warn("Unable to determine database dialect", e);
+        } finally {
+            close(con);
+        }
+        __log.debug("Using database dialect: " + d);
+        return d;
+    }
+
+  public JobDAO createJob(boolean transacted, JobDetails jobDetails, boolean persisted, long scheduledDate) {
+    return new JobDAOImpl(scheduledDate, new GUID().toString(), transacted, jobDetails);
+  }
+
+  public void close() {
+    
+  }
+
+  public boolean isClosed() {
+    return !_active.get();
+  }
+
+
+    enum Dialect {
+        DB2, DERBY, FIREBIRD, HSQL, MYSQL, ORACLE, SQLSERVER, SYBASE, UNKNOWN 
+    }
+    
+}