You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by je...@apache.org on 2010/05/02 19:03:00 UTC
svn commit: r940263 [15/16] - in /ode/trunk: ./ axis2-war/
axis2-war/src/main/assembly/
axis2-war/src/main/webapp/WEB-INF/conf.hib-derby/
axis2-war/src/main/webapp/WEB-INF/conf.jpa-derby/
axis2-war/src/main/webapp/WEB-INF/conf/ axis2-war/src/test/java/...
Modified: ode/trunk/pom.xml
URL: http://svn.apache.org/viewvc/ode/trunk/pom.xml?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/pom.xml (original)
+++ ode/trunk/pom.xml Sun May 2 17:02:51 2010
@@ -64,7 +64,8 @@
<wsdl4j.version>1.6.2</wsdl4j.version>
<woodstox.version>3.2.4</woodstox.version>
<javax.mail.version>1.4</javax.mail.version>
- <hibernate.version>3.3.1.GA</hibernate.version>
+ <hibernate.version>3.3.2.GA</hibernate.version>
+ <hibernate.entitymanager.version>3.4.0.GA</hibernate.entitymanager.version>
<javassist.version>3.4.GA</javassist.version>
<spring.version>2.5.6</spring.version>
<geronimo.specs.version>1.0</geronimo.specs.version>
@@ -88,7 +89,8 @@
<xmlbeans.version>2.3.0</xmlbeans.version>
<xstream.version>1.2</xstream.version>
<junit.version>4.4</junit.version>
- <hsqldb.version>1.8.0.7</hsqldb.version>
+ <hsqldb.version>1.8.0.10</hsqldb.version>
+ <h2.version>1.2.131</h2.version>
<persistence-api.version>1.0</persistence-api.version>
<xalan.version>2.7.1</xalan.version>
<ant.version>1.6.5</ant.version>
@@ -123,16 +125,18 @@
<module>dao-hibernate</module>
<module>tools</module>
<module>bpel-store</module>
- <module>dao-jpa-db</module>
+ <module>dao-jpa-db</module>
+ <module>dao-jpa-ojpa</module>
+ <module>dao-jpa-hibernate</module>
<module>dao-hibernate-db</module>
<module>engine</module>
<module>bpel-connector</module>
- <module>bpel-test</module>
- <module>extensions</module>
- <module>axis2</module>
- <module>jbi</module>
- <module>axis2-war</module>
- <module>distro</module>
+ <module>bpel-test</module>
+ <module>extensions</module>
+ <module>axis2</module>
+ <module>jbi</module>
+ <module>axis2-war</module>
+ <module>distro</module>
</modules>
<build>
@@ -199,7 +203,7 @@
<groupId>org.apache.derby</groupId>
<artifactId>derbytools</artifactId>
<version>${derby.version}</version>
- </dependency>
+ </dependency>
</dependencies>
</plugin>
@@ -386,6 +390,22 @@
<artifactId>ode-dao-jpa</artifactId>
<version>${ode.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode-dao-jpa</artifactId>
+ <version>${project.version}</version>
+ <classifier>openjpa</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode-dao-jpa-ojpa</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode-dao-jpa-hibernate</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.ode</groupId>
<artifactId>ode-jca-ra</artifactId>
@@ -703,7 +723,19 @@
</exclusion>
</exclusions>
</dependency>
-
+
+ <dependency>
+ <groupId>org.hibernate</groupId>
+ <artifactId>hibernate-entitymanager</artifactId>
+ <version>${hibernate.entitymanager.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>jta</artifactId>
+ <groupId>javax.transaction</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>javassist</groupId>
<artifactId>javassist</artifactId>
@@ -806,6 +838,12 @@
<artifactId>hsqldb</artifactId>
<version>${hsqldb.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>${h2.version}</version>
+ </dependency>
+
<!-- AXIS2 dependencies -->
<dependency>
@@ -893,3 +931,5 @@
</dependencyManagement>
</project>
+
+
Modified: ode/trunk/scheduler-simple/pom.xml
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/pom.xml?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/pom.xml (original)
+++ ode/trunk/scheduler-simple/pom.xml Sun May 2 17:02:51 2010
@@ -19,77 +19,145 @@
-->
<project>
- <modelVersion>4.0.0</modelVersion>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode-scheduler-simple</artifactId>
+ <name>ODE :: BPEL Scheduler Simple</name>
+ <parent>
<groupId>org.apache.ode</groupId>
- <artifactId>ode-scheduler-simple</artifactId>
- <name>ODE :: BPEL Scheduler Simple</name>
- <parent>
- <groupId>org.apache.ode</groupId>
- <artifactId>ode</artifactId>
- <version>2.0-SNAPSHOT</version>
- </parent>
+ <artifactId>ode</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ </parent>
- <dependencies>
- <dependency>
- <groupId>org.apache.ode</groupId>
- <artifactId>ode-bpel-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.ode</groupId>
- <artifactId>ode-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jta_1.1_spec</artifactId>
- </dependency>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode-bpel-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode-bpel-dao</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode-il-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jta_1.1_spec</artifactId>
+ </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>hsqldb</groupId>
- <artifactId>hsqldb</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>backport-util-concurrent</groupId>
- <artifactId>backport-util-concurrent</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.modules</groupId>
- <artifactId>geronimo-kernel</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.components</groupId>
- <artifactId>geronimo-transaction</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-ejb_2.1_spec</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>backport-util-concurrent</groupId>
+ <artifactId>backport-util-concurrent</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.modules</groupId>
+ <artifactId>geronimo-kernel</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.components</groupId>
+ <artifactId>geronimo-transaction</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-ejb_2.1_spec</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode-dao-jpa-ojpa</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <!-- for the integration tests - seems to work with the openjpa enhanced enities -->
+ <dependency>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode-dao-jpa-hibernate</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>hibernate-jpa</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports/hibernate-jpa</reportsDirectory>
+ <systemProperties>
+ <property>
+ <name>dao.factory.scheduler</name>
+ <value>org.apache.ode.dao.jpa.hibernate.SchedulerDAOConnectionFactoryImpl</value>
+ </property>
+ </systemProperties>
+ <excludes>
+ <exclude>**/SchedulerThreadTest.java</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ <execution>
+ <id>jdbc</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <reportsDirectory>${project.build.directory}/surefire-reports/jdbc</reportsDirectory>
+ <systemProperties>
+ <property>
+ <name>dao.factory.scheduler</name>
+ <value>org.apache.ode.scheduler.simple.jdbc.SchedulerDAOConnectionFactoryImpl</value>
+ </property>
+ </systemProperties>
+ <excludes>
+ <exclude>**/SchedulerThreadTest.java</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
Added: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java (added)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JobDAOTask.java Sun May 2 17:02:51 2010
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.scheduler.simple;
+
+import org.apache.ode.dao.scheduler.JobDAO;
+
+/**
+ * The thing that we schedule.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ *
+ */
+class JobDAOTask extends Task {
+
+ public JobDAO dao;
+ public String jobId;
+
+ JobDAOTask(JobDAO job) {
+ super(job.getScheduledDate());
+ this.dao = job;
+ this.jobId=job.getJobId();
+ }
+
+ /**
+ * Used for dequeuing a Job without a lookup
+ * @param jobId
+ */
+ JobDAOTask(String jobId) {
+ super(0L);
+ this.jobId=jobId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof JobDAOTask && jobId.equals(((JobDAOTask) obj).jobId);
+ }
+
+ @Override
+ public int hashCode() {
+ return jobId.hashCode();
+ }
+}
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java Sun May 2 17:02:51 2010
@@ -30,7 +30,7 @@ import org.apache.ode.utils.stl.Collecti
import org.apache.ode.utils.stl.MemberOfFunction;
/**
- * Implements the "todo" queue and prioritized scheduling mechanism.
+ * Implements the "todo" queue and prioritized scheduling mechanism.
*
* @author mszefler
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
@@ -58,7 +58,7 @@ class SchedulerThread implements Runnabl
SchedulerThread(TaskRunner runner) {
_todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
- new JobComparatorByDate());
+ new TaskComparatorByDate());
_taskrunner = runner;
}
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Sun May 2 17:02:51 2010
@@ -16,27 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.ode.scheduler.simple;
import java.util.*;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
-import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.common.CorrelationKey;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.Scheduler.JobType;
+import org.apache.ode.dao.scheduler.JobDAO;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
/**
* A reliable and relatively simple scheduler that uses a database to persist information about scheduled tasks.
@@ -56,305 +53,257 @@ import org.apache.ode.bpel.iapi.Schedule
*
*/
public class SimpleScheduler implements Scheduler, TaskRunner {
- private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
-
- /**
- * Jobs scheduled with a time that is between [now, now+immediateInterval] will be assigned to the current node, and placed
- * directly on the todo queue.
- */
- long _immediateInterval = 30000;
-
- /**
- * Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
- * node, but will not be placed on the todo queue (the promoter will pick them up).
- */
- long _nearFutureInterval = 10 * 60 * 1000;
-
- /** 10s of no communication and you are deemed dead. */
- long _staleInterval = 10000;
-
- TransactionManager _txm;
-
- String _nodeId;
-
- /** Maximum number of jobs in the "near future" / todo queue. */
- int _todoLimit = 10000;
-
- /** The object that actually handles the jobs. */
- volatile JobProcessor _jobProcessor;
-
- volatile JobProcessor _polledRunnableProcessor;
-
- private SchedulerThread _todo;
-
- private DatabaseDelegate _db;
-
- /** All the nodes we know about */
- private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
-
- /** When we last heard from our nodes. */
- private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
-
- private boolean _running;
-
- /** Time for next upgrade. */
- private AtomicLong _nextUpgrade = new AtomicLong();
-
- /** Time for next job load */
- private AtomicLong _nextScheduleImmediate = new AtomicLong();
-
- private Random _random = new Random();
-
- public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
- _nodeId = nodeId;
- _db = del;
- _todoLimit = Integer.parseInt(conf.getProperty("ode.scheduler.queueLength", "10000"));
- _todo = new SchedulerThread(this);
- }
-
- public void setNodeId(String nodeId) {
- _nodeId = nodeId;
- }
-
- public void setStaleInterval(long staleInterval) {
- _staleInterval = staleInterval;
- }
-
- public void setImmediateInterval(long immediateInterval) {
- _immediateInterval = immediateInterval;
- }
-
- public void setNearFutureInterval(long nearFutureInterval) {
- _nearFutureInterval = nearFutureInterval;
- }
-
- public void setTransactionManager(TransactionManager txm) {
- _txm = txm;
- }
-
- public void setDatabaseDelegate(DatabaseDelegate dbd) {
- _db = dbd;
- }
-
- public void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor) {
- _polledRunnableProcessor = polledRunnableProcessor;
- }
-
- public void cancelJob(String jobId) throws ContextException {
- _todo.dequeue(new Job(0, jobId, false, null));
- try {
- _db.deleteJob(jobId, _nodeId);
- } catch (DatabaseException e) {
- __log.debug("Job removal failed.", e);
- throw new ContextException("Job removal failed.", e);
- }
- }
-
- public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
- try {
- if (__log.isDebugEnabled()) __log.debug("Beginning a new transaction");
- _txm.begin();
- } catch (Exception ex) {
- String errmsg = "Internal Error, could not begin transaction.";
- throw new ContextException(errmsg, ex);
- }
- boolean success = false;
- try {
- T retval = transaction.call();
- success = true;
- return retval;
- } catch (Exception ex) {
- throw ex;
- } finally {
- if (success) {
- if (__log.isDebugEnabled()) __log.debug("Commiting...");
- _txm.commit();
- } else {
- if (__log.isDebugEnabled()) __log.debug("Rollbacking...");
- _txm.rollback();
- }
- }
- }
-
- public String schedulePersistedJob(final JobDetails jobDetail, Date when) throws ContextException {
- long ctime = System.currentTimeMillis();
- if (when == null)
- when = new Date(ctime);
-
- if (__log.isDebugEnabled())
- __log.debug("scheduling " + jobDetail + " for " + when);
-
- return schedulePersistedJob(new Job(when.getTime(), true, jobDetail), when, ctime);
- }
-
- public String scheduleMapSerializableRunnable(MapSerializableRunnable runnable, Date when) throws ContextException {
- long ctime = System.currentTimeMillis();
- if (when == null)
- when = new Date(ctime);
-
- JobDetails jobDetails = new JobDetails();
- jobDetails.getDetailsExt().put("runnable", runnable);
- runnable.storeToDetails(jobDetails);
-
- if (__log.isDebugEnabled())
- __log.debug("scheduling " + jobDetails + " for " + when);
-
- return schedulePersistedJob(new Job(when.getTime(), true, jobDetails), when, ctime);
- }
-
- public String schedulePersistedJob(Job job, Date when, long ctime) throws ContextException {
- boolean immediate = when.getTime() <= ctime + _immediateInterval;
- boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval;
- try {
- if (immediate) {
- // If we have too many jobs in the queue, we don't allow any new ones
- if (_todo.size() > _todoLimit) {
- __log.error("The execution queue is backed up, the engine can't keep up with the load. Either " +
- "increase the queue size or regulate the flow.");
- return null;
- }
-
- // Immediate scheduling means we put it in the DB for safe keeping
- _db.insertJob(job, _nodeId, true);
- // And add it to our todo list .
- addTodoOnCommit(job);
-
- __log.debug("scheduled immediate job: " + job.jobId);
- } else if (nearfuture) {
- // Near future, assign the job to ourselves (why? -- this makes it very unlikely that we
- // would get two nodes trying to process the same instance, which causes unsightly rollbacks).
- _db.insertJob(job, _nodeId, false);
- __log.debug("scheduled near-future job: " + job.jobId);
- } else /* far future */{
- // Not the near future, we don't assign a node-id, we'll assign it later.
- _db.insertJob(job, null, false);
- __log.debug("scheduled far-future job: " + job.jobId);
- }
- } catch (DatabaseException dbe) {
- __log.error("Database error.", dbe);
- throw new ContextException("Database error.", dbe);
- }
- return job.jobId;
-
- }
-
- public String scheduleVolatileJob(boolean transacted, JobDetails jobDetail) throws ContextException {
- Job job = new Job(System.currentTimeMillis(), transacted, jobDetail);
- job.persisted = false;
- addTodoOnCommit(job);
- return job.toString();
- }
-
- public void setJobProcessor(JobProcessor processor) throws ContextException {
- _jobProcessor = processor;
- }
-
- public void shutdown() {
- stop();
- _jobProcessor = null;
- _txm = null;
- _todo = null;
- }
-
- public synchronized void start() {
- if (_running)
- return;
-
- _todo.clearTasks(UpgradeJobsTask.class);
- _todo.clearTasks(LoadImmediateTask.class);
- _todo.clearTasks(CheckStaleNodes.class);
-
- _knownNodes.clear();
-
- try {
- execTransaction(new Callable<Void>() {
-
- public Void call() throws Exception {
- _knownNodes.addAll(_db.getNodeIds());
- return null;
- }
-
- });
- } catch (Exception ex) {
- __log.error("Error retrieving node list.", ex);
- throw new ContextException("Error retrieving node list.", ex);
- }
-
- // Pretend we got a heartbeat...
- for (String s : _knownNodes)
- _lastHeartBeat.put(s, System.currentTimeMillis());
-
- // schedule immediate job loading for now!
- _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));
-
- // schedule check for stale nodes, make it random so that the nodes don't overlap.
- _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long) (_random.nextDouble() * _staleInterval)));
-
- // do the upgrade sometime (random) in the immediate interval.
- _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long) (_random.nextDouble() * _immediateInterval)));
-
- _todo.start();
- _running = true;
- }
-
- public synchronized void stop() {
- if (!_running)
- return;
-
- _todo.stop();
- _todo.clearTasks(UpgradeJobsTask.class);
- _todo.clearTasks(LoadImmediateTask.class);
- _todo.clearTasks(CheckStaleNodes.class);
- _running = false;
- }
-
- public void jobCompleted(String jobId) {
- boolean deleted = false;
- try {
- deleted = _db.deleteJob(jobId, _nodeId);
- } catch (DatabaseException de) {
- String errmsg = "Database error.";
- __log.error(errmsg, de);
- throw new ContextException(errmsg, de);
- }
+ private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
+ /**
+ * Jobs scheduled with a time that is between [now, now+immediateInterval] will be assigned to the current node, and placed
+ * directly on the todo queue.
+ */
+ long _immediateInterval = 30000;
+ /**
+ * Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
+ * node, but will not be placed on the todo queue (the promoter will pick them up).
+ */
+ long _nearFutureInterval = 10 * 60 * 1000;
+ /** 10s of no communication and you are deemed dead. */
+ long _staleInterval = 10000;
+ String _nodeId;
+ /** Maximum number of jobs in the "near future" / todo queue. */
+ int _todoLimit = 10000;
+ /** The object that actually handles the jobs. */
+ volatile JobProcessor _jobProcessor;
+ volatile JobProcessor _polledRunnableProcessor;
+ private SchedulerThread _todo;
+ private SchedulerDAOConnectionFactory _dbcf;
+ private TransactionManager _txm;
+ /** All the nodes we know about */
+ private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
+ /** When we last heard from our nodes. */
+ private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
+ private boolean _running;
+ /** Time for next upgrade. */
+ private AtomicLong _nextUpgrade = new AtomicLong();
+ /** Time for next job load */
+ private AtomicLong _nextScheduleImmediate = new AtomicLong();
+ private Random _random = new Random();
+
+ public SimpleScheduler(String nodeId, SchedulerDAOConnectionFactory dbcf, TransactionManager mgr, Properties conf) {
+ _nodeId = nodeId;
+ _dbcf = dbcf;
+ _txm = mgr;
+ _todoLimit = Integer.parseInt(conf.getProperty("ode.scheduler.queueLength", "10000"));
+ _todo = new SchedulerThread(this);
+ }
+
+ public void setNodeId(String nodeId) {
+ _nodeId = nodeId;
+ }
+
+ public void setStaleInterval(long staleInterval) {
+ _staleInterval = staleInterval;
+ }
+
+ public void setImmediateInterval(long immediateInterval) {
+ _immediateInterval = immediateInterval;
+ }
+
+ public void setNearFutureInterval(long nearFutureInterval) {
+ _nearFutureInterval = nearFutureInterval;
+ }
+
+ public void setSchedulerDAOConnectionFactory(SchedulerDAOConnectionFactory dbcf) {
+ _dbcf = dbcf;
+ }
+
+ public void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor) {
+ _polledRunnableProcessor = polledRunnableProcessor;
+ }
+
+ public void cancelJob(final String jobId) throws ContextException {
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ _todo.dequeue(new JobDAOTask(jobId));
+ if (!conn.deleteJob(jobId, _nodeId)) {
+ __log.debug("Job removal failed.");
+ throw new ContextException("Job removal failed.");
+ }
+ }
+
+ public String schedulePersistedJob(final JobDetails jobDetail, Date when) throws ContextException {
+ long ctime = System.currentTimeMillis();
+ if (when == null) {
+ when = new Date(ctime);
+ }
+
+ if (__log.isDebugEnabled()) {
+ __log.debug("scheduling " + jobDetail + " for " + when);
+ }
+
+ return schedulePersistedJob(jobDetail, true, when, ctime);
+ }
+
+ public String scheduleMapSerializableRunnable(MapSerializableRunnable runnable, Date when) throws ContextException {
+ long ctime = System.currentTimeMillis();
+ if (when == null) {
+ when = new Date(ctime);
+ }
+
+ JobDetails jobDetails = new JobDetails();
+ jobDetails.getDetailsExt().put("runnable", runnable);
+ runnable.storeToDetails(jobDetails);
+
+ if (__log.isDebugEnabled()) {
+ __log.debug("scheduling " + jobDetails + " for " + when);
+ }
+
+ return schedulePersistedJob(jobDetails, true, when, ctime);
+ }
+
+ private JobDAO insertJob(final boolean transacted, final JobDetails jobDetails, final long scheduledDate, final String nodeID, final boolean loaded, final boolean enqueue) throws ContextException {
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ final JobDAO job = conn.createJob(transacted, jobDetails, true, scheduledDate);
+ if (!conn.insertJob(job, nodeID, loaded)) {
+ String msg = String.format("Database insert failed. jobId %s nodeId %s", job.getJobId(), nodeID);
+ __log.error(msg);
+ throw new ContextException(msg);
+ }
+ if (enqueue) {
+ addTodoOnCommit(new JobDAOTask(job));
+ }
+ return job;
+ }
+
+ public String schedulePersistedJob(JobDetails jobDetails, boolean transacted, Date when, long ctime) throws ContextException {
+ boolean immediate = when.getTime() <= ctime + _immediateInterval;
+ boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval;
+ JobDAO job;
+
+ if (immediate) {
+ // If we have too many jobs in the queue, we don't allow any new ones
+ if (_todo.size() > _todoLimit) {
+ __log.error("The execution queue is backed up, the engine can't keep up with the load. Either "
+ + "increase the queue size or regulate the flow.");
+ return null;
+ }
+ job = insertJob(transacted, jobDetails, when.getTime(), _nodeId, true, true);
+ __log.debug("scheduled immediate job: " + job.getJobId());
+ } else if (nearfuture) {
+ // Near future, assign the job to ourselves (why? -- this makes it very unlikely that we
+ // would get two nodes trying to process the same instance, which causes unsightly rollbacks).
+ job = insertJob(transacted, jobDetails, when.getTime(), _nodeId, false, false);
+ __log.debug("scheduled near-future job: " + job.getJobId());
+ } else /* far future */ {
+ // Not the near future, we don't assign a node-id, we'll assign it later.
+ job = insertJob(transacted, jobDetails, when.getTime(), null, false, false);
+ __log.debug("scheduled far-future job: " + job.getJobId());
+ }
+ return job.getJobId();
+ }
+
+ public String scheduleVolatileJob(final boolean transacted, final JobDetails jobDetail) throws ContextException {
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ final JobDAO job = conn.createJob(transacted, jobDetail, false, System.currentTimeMillis());
+ addTodoOnCommit(new JobDAOTask(job));
+ return job.getJobId();
+ }
+
+ public void setJobProcessor(JobProcessor processor) throws ContextException {
+ _jobProcessor = processor;
+ }
+
+ public void shutdown() {
+ stop();
+ _jobProcessor = null;
+ _todo = null;
+ }
+
+ public synchronized void start() {
+ if (_running) {
+ return;
+ }
+
+ _todo.clearTasks(UpgradeJobsTask.class);
+ _todo.clearTasks(LoadImmediateTask.class);
+ _todo.clearTasks(CheckStaleNodes.class);
+
+ _knownNodes.clear();
+
+ exec(new Callable<Boolean>() {
+
+ public Boolean call(SchedulerDAOConnection conn) throws ContextException {
+ List<String> ids = conn.getNodeIds();
+ if (ids == null) {
+ __log.error("Error retrieving node list.");
+ throw new ContextException("Error retrieving node list.");
+ }
+ _knownNodes.addAll(ids);
+ return null;
+ }
+ });
+
+ // Pretend we got a heartbeat...
+ for (String s : _knownNodes) {
+ _lastHeartBeat.put(s, System.currentTimeMillis());
+ }
+
+ // schedule immediate job loading for now!
+ _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));
+
+ // schedule check for stale nodes, make it random so that the nodes don't overlap.
+ _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long) (_random.nextDouble() * _staleInterval)));
+
+ // do the upgrade sometime (random) in the immediate interval.
+ _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long) (_random.nextDouble() * _immediateInterval)));
+
+ _todo.start();
+ _running = true;
+ }
+
+ public synchronized void stop() {
+ if (!_running) {
+ return;
+ }
+
+ _todo.stop();
+ _todo.clearTasks(UpgradeJobsTask.class);
+ _todo.clearTasks(LoadImmediateTask.class);
+ _todo.clearTasks(CheckStaleNodes.class);
+ _running = false;
+ }
+
+ public void jobCompleted(final String jobId) {
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ boolean deleted = conn.deleteJob(jobId, _nodeId);
if (!deleted) {
- try {
- _txm.getTransaction().setRollbackOnly();
- } catch (Exception ex) {
- __log.error("Transaction manager error; setRollbackOnly() failed.", ex);
- }
-
throw new ContextException("Job no longer in database: jobId=" + jobId);
}
}
-
- /**
- * Run a job in the current thread.
- *
- * @param job
- * job to run.
- */
- protected void runJob(final Job job) {
- final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail, job.detail.getRetryCount());
-
- try {
- try {
- _jobProcessor.onScheduledJob(jobInfo);
- } catch (JobProcessorException jpe) {
- if (jpe.retry)
- __log.error("Error while processing transaction, retrying in " + doRetry(job) + "s");
- else
- __log.error("Error while processing transaction, no retry.", jpe);
- }
- } catch (Exception ex) {
- __log.error("Error in scheduler processor.", ex);
- }
-
+ /**
+ * Run a job in the current thread.
+ *
+ * @param job
+ * job to run.
+ */
+ protected void runJob(final JobDAO job) {
+ final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.getJobId(), job.getDetails(), job.getDetails().getRetryCount());
+
+ try {
+ try {
+ _jobProcessor.onScheduledJob(jobInfo);
+ } catch (JobProcessorException jpe) {
+ if (jpe.retry) {
+ __log.error("Error while processing transaction, retrying in " + doRetry(job) + "s");
+ } else {
+ __log.error("Error while processing transaction, no retry.", jpe);
+ }
+ }
+ } catch (Exception ex) {
+ __log.error("Error in scheduler processor.", ex);
}
- private void addTodoOnCommit(final Job job) {
+ }
+
+ private void addTodoOnCommit(final JobDAOTask job) {
Transaction tx;
try {
@@ -365,72 +314,75 @@ public class SimpleScheduler implements
throw new ContextException(errmsg, ex);
}
- if (tx == null)
- throw new ContextException("Missing required transaction in thread " + Thread.currentThread());
-
- try {
- tx.registerSynchronization(new Synchronization() {
+ if (tx == null) {
+ _todo.enqueue(job);
+ } else {
+ try {
+ tx.registerSynchronization(new Synchronization() {
- public void afterCompletion(int status) {
- if (status == Status.STATUS_COMMITTED) {
- _todo.enqueue(job);
+ public void afterCompletion(int status) {
+ if (status == Status.STATUS_COMMITTED) {
+ _todo.enqueue(job);
+ }
}
- }
-
- public void beforeCompletion() {
- }
- });
-
- } catch (Exception e) {
- String errmsg = "Unable to registrer synchronizer. ";
- __log.error(errmsg, e);
- throw new ContextException(errmsg, e);
+ public void beforeCompletion() {
+ }
+ });
+ } catch (Exception e) {
+ String errmsg = "Unable to registrer synchronizer. ";
+ __log.error(errmsg, e);
+ throw new ContextException(errmsg, e);
+ }
}
}
- public void runTask(Task task) {
- if (task instanceof Job)
- runJob((Job) task);
- if (task instanceof SchedulerTask)
- ((SchedulerTask) task).run();
+ public void runTask(Task task) {
+ if (task instanceof JobDAOTask) {
+ runJob(((JobDAOTask) task).dao);
}
+ if (task instanceof SchedulerTask) {
+ ((SchedulerTask) task).run();
+ }
+ }
- public void updateHeartBeat(String nodeId) {
- if (nodeId == null)
- return;
-
- if (_nodeId.equals(nodeId))
- return;
+ public void updateHeartBeat(String nodeId) {
+ if (nodeId == null) {
+ return;
+ }
- _lastHeartBeat.put(nodeId, System.currentTimeMillis());
- _knownNodes.add(nodeId);
+ if (_nodeId.equals(nodeId)) {
+ return;
}
+ _lastHeartBeat.put(nodeId, System.currentTimeMillis());
+ _knownNodes.add(nodeId);
+ }
+
boolean doLoadImmediate() {
__log.debug("LOAD IMMEDIATE started");
- List<Job> jobs;
- try {
- do {
- jobs = execTransaction(new Callable<List<Job>>() {
- public List<Job> call() throws Exception {
- return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, 10);
- }
- });
- for (Job j : jobs) {
- if (__log.isDebugEnabled())
- __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
- _todo.enqueue(j);
+ List<JobDAO> jobs;
+ do {
+ jobs = exec(new Callable<List<JobDAO>>() {
+ public List<JobDAO> call(SchedulerDAOConnection conn) throws ContextException {
+ return conn.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, 10);
}
- } while (jobs.size() == 10);
- return true;
- } catch (Exception ex) {
- __log.error("Error loading immediate jobs from database.", ex);
- return false;
- } finally {
- __log.debug("LOAD IMMEDIATE complete");
- }
+ });
+ if (jobs == null) {
+ __log.error("Error loading immediate jobs from database.");
+ return false;
+ }
+ for (JobDAO j : jobs) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("todo.enqueue job from db: " + j.getJobId() + " for " + j.getScheduledDate());
+ }
+ _todo.enqueue(new JobDAOTask(j));
+ }
+
+ } while (jobs.size() == 10);
+ __log.debug("LOAD IMMEDIATE complete");
+ return true;
}
boolean doUpgrade() {
@@ -446,154 +398,214 @@ public class SimpleScheduler implements
// of the time by the number of nodes to create the node assignment.
// This can be done in a single update statement.
final long maxtime = System.currentTimeMillis() + _nearFutureInterval;
- try {
- return execTransaction(new Callable<Boolean>() {
-
- public Boolean call() throws Exception {
- int numNodes = knownNodes.size();
- for (int i = 0; i < numNodes; ++i) {
- String node = knownNodes.get(i);
- _db.updateAssignToNode(node, i, numNodes, maxtime);
+ return exec(new Callable<Boolean>() {
+ public Boolean call(SchedulerDAOConnection conn) throws ContextException {
+ int numNodes = knownNodes.size();
+ if (numNodes == -1) {
+ __log.error("Database error upgrading jobs.");
+ return false;
+ }
+ for (int i = 0; i < numNodes; ++i) {
+ String node = knownNodes.get(i);
+ if (conn.updateAssignToNode(node, i, numNodes, maxtime) == -1) {
+ __log.error("updateAssignToNode failed");
+ return false;
}
- return true;
}
-
- });
-
- } catch (Exception ex) {
- __log.error("Database error upgrading jobs.", ex);
- return false;
- } finally {
- __log.debug("UPGRADE complete");
- }
-
+ __log.debug("UPGRADE complete");
+ return true;
+ }
+ });
}
- /**
- * Re-assign stale node's jobs to self.
- *
- * @param nodeId
- */
+ /**
+ * Re-assign stale node's jobs to self.
+ *
+ * @param nodeId
+ */
void recoverStaleNode(final String nodeId) {
__log.debug("recovering stale node " + nodeId);
- try {
- int numrows = execTransaction(new Callable<Integer>() {
+ int numrows = exec(new Callable<Integer>() {
- public Integer call() throws Exception {
- return _db.updateReassign(nodeId, _nodeId);
+ public Integer call(SchedulerDAOConnection conn) throws ContextException {
+ int numrows = conn.updateReassign(nodeId, _nodeId);
+ if (numrows == -1) {
+ __log.error("Database error reassigning node.");
+ throw new ContextException("Database error reassigning node.");
}
+ __log.debug("reassigned " + numrows + " jobs to self. ");
+ return numrows;
- });
+ }
+ });
- __log.debug("reassigned " + numrows + " jobs to self. ");
+ // We can now forget about this node, if we see it again, it will be
+ // "new to us"
+ _knownNodes.remove(nodeId);
+ _lastHeartBeat.remove(nodeId);
- // We can now forget about this node, if we see it again, it will be
- // "new to us"
- _knownNodes.remove(nodeId);
- _lastHeartBeat.remove(nodeId);
+ // Force a load-immediate to catch anything new from the recovered node.
+ doLoadImmediate();
+ __log.debug("node recovery complete");
+ }
- // Force a load-immediate to catch anything new from the recovered node.
- doLoadImmediate();
+ private long doRetry(final JobDAO job) {
+ final int retry = job.getDetails().getRetryCount() + 1;
+ final long delay = (long) (Math.pow(5, retry - 1));
- } catch (Exception ex) {
- __log.error("Database error reassigning node.", ex);
- } finally {
- __log.debug("node recovery complete");
- }
- }
+ /*even though the JobDetails property is marked with Transient Hibernate
+ still treats any changes to it as a modification and will attempt to persist it.
+ for the new Job we will clone the existing details.*/
- private long doRetry(Job job) throws DatabaseException {
- int retry = job.detail.getRetryCount() + 1;
- job.detail.setRetryCount(retry);
- long delay = (long)(Math.pow(5, retry - 1));
- Job jobRetry = new Job(System.currentTimeMillis() + delay*1000, true, job.detail);
- _db.insertJob(jobRetry, _nodeId, false);
+ Scheduler.JobDetails newJobDetails;
+ try {
+ newJobDetails = job.getDetails().clone();
+ } catch (CloneNotSupportedException ce) {
+ __log.error("Unable to clone job details", ce);
+ return -1L;
+ }
+ newJobDetails.setRetryCount(newJobDetails.getRetryCount() + 1);
+ SchedulerDAOConnection conn = _dbcf.getConnection();
+ JobDAO jobRetry = conn.createJob(job.isTransacted(), newJobDetails, job.isPersisted(), System.currentTimeMillis() + delay * 1000);
+ if (!conn.insertJob(jobRetry, _nodeId, false)) {
+ __log.debug("retry failed for Job " + jobRetry.getJobId());
+ return -1L;
+ }
+ __log.debug("added retry for Job " + job.getJobId() + " at " + job.getScheduledDate() + " new Job: " + jobRetry.getJobId() + " at " + jobRetry.getScheduledDate());
return delay;
}
- private abstract class SchedulerTask extends Task implements Runnable {
- SchedulerTask(long schedDate) {
- super(schedDate);
- }
+ <T> T exec(Callable<T> callable) throws ContextException {
+ //Execute the work on the current thread
+ return callable.call();
}
- private class LoadImmediateTask extends SchedulerTask {
-
- LoadImmediateTask(long schedDate) {
- super(schedDate);
- }
+ /**
+ * Wrapper for database transactions.
+ *
+ * @author Maciej Szefler
+ *
+ * @param <V>
+ * return type
+ */
+ abstract class Callable<V> implements java.util.concurrent.Callable<V> {
- public void run() {
+ public V call() throws ContextException {
boolean success = false;
+ SchedulerDAOConnection conn = _dbcf.getConnection();
try {
- success = doLoadImmediate();
+ if (_txm != null) {
+ _txm.begin();
+ }
+ V r = call(conn);
+ if (_txm != null) {
+ _txm.commit();
+ }
+ success = true;
+ return r;
+ } catch (ContextException ce) {
+ throw ce;
+ } catch (Exception e) {
+ throw new ContextException("TxError", e);
} finally {
- if (success)
- _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .75)));
- else
- _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 100));
+
+ try {
+ if (!success && _txm != null ) {
+ _txm.rollback();
+ }
+ } catch (Exception ex) {
+ __log.error("TxError", ex);
+ }
+ conn.close();
}
+
}
+ abstract V call(SchedulerDAOConnection conn) throws ContextException;
}
- /**
- * Upgrade jobs from far future to immediate future (basically, assign them to a node).
- *
- * @author mszefler
- *
- */
- private class UpgradeJobsTask extends SchedulerTask {
+ private abstract class SchedulerTask extends Task implements Runnable {
- UpgradeJobsTask(long schedDate) {
- super(schedDate);
- }
+ SchedulerTask(long schedDate) {
+ super(schedDate);
+ }
+ }
- public void run() {
- long ctime = System.currentTimeMillis();
- long ntime = _nextUpgrade.get();
- __log.debug("UPGRADE task for " + schedDate + " fired at " + ctime);
-
- // We could be too early, this can happen if upgrade gets delayed due to another
- // node
- if (_nextUpgrade.get() > System.currentTimeMillis()) {
- __log.debug("UPGRADE skipped -- wait another " + (ntime - ctime) + "ms");
- _todo.enqueue(new UpgradeJobsTask(ntime));
- return;
- }
+ private class LoadImmediateTask extends SchedulerTask {
- boolean success = false;
- try {
- success = doUpgrade();
- } finally {
- long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100);
- _nextUpgrade.set(future);
- _todo.enqueue(new UpgradeJobsTask(future));
- __log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms");
- }
+ LoadImmediateTask(long schedDate) {
+ super(schedDate);
+ }
+
+ public void run() {
+ boolean success = false;
+ try {
+ success = doLoadImmediate();
+ } finally {
+ if (success) {
+ _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .75)));
+ } else {
+ _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 100));
}
+ }
+ }
+ }
+
+ /**
+ * Upgrade jobs from far future to immediate future (basically, assign them to a node).
+ *
+ * @author mszefler
+ *
+ */
+ private class UpgradeJobsTask extends SchedulerTask {
+ UpgradeJobsTask(long schedDate) {
+ super(schedDate);
}
- /**
- * Check if any of the nodes in our cluster are stale.
- */
- private class CheckStaleNodes extends SchedulerTask {
+ public void run() {
+ long ctime = System.currentTimeMillis();
+ long ntime = _nextUpgrade.get();
+ __log.debug("UPGRADE task for " + schedDate + " fired at " + ctime);
- CheckStaleNodes(long schedDate) {
- super(schedDate);
- }
+ // We could be too early, this can happen if upgrade gets delayed due to another
+ // node
+ if (_nextUpgrade.get() > System.currentTimeMillis()) {
+ __log.debug("UPGRADE skipped -- wait another " + (ntime - ctime) + "ms");
+ _todo.enqueue(new UpgradeJobsTask(ntime));
+ return;
+ }
- public void run() {
- _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
- __log.debug("CHECK STALE NODES started");
- for (String nodeId : _knownNodes) {
- Long lastSeen = _lastHeartBeat.get(nodeId);
- if (lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
- recoverStaleNode(nodeId);
- }
- }
+ boolean success = false;
+ try {
+ success = doUpgrade();
+ } finally {
+ long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100);
+ _nextUpgrade.set(future);
+ _todo.enqueue(new UpgradeJobsTask(future));
+ __log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms");
+ }
+ }
+ }
+ /**
+ * Check if any of the nodes in our cluster are stale.
+ */
+ private class CheckStaleNodes extends SchedulerTask {
+
+ CheckStaleNodes(long schedDate) {
+ super(schedDate);
}
+ public void run() {
+ _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
+ __log.debug("CHECK STALE NODES started");
+ for (String nodeId : _knownNodes) {
+ Long lastSeen = _lastHeartBeat.get(nodeId);
+ if (lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval) {
+ recoverStaleNode(nodeId);
+ }
+ }
+ }
+ }
}
Added: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskComparatorByDate.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskComparatorByDate.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskComparatorByDate.java (added)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskComparatorByDate.java Sun May 2 17:02:51 2010
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple;
+
+import java.util.Comparator;
+
+/**
+ * Compare jobs, using scheduled date as sort criteria.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+class TaskComparatorByDate implements Comparator<Task> {
+
+ public int compare(Task o1, Task o2) {
+ long diff = o1.schedDate - o2.schedDate;
+ if (diff < 0) return -1;
+ if (diff > 0) return 1;
+ return 0;
+ }
+
+}
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java?rev=940263&r1=940262&r2=940263&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/TaskRunner.java Sun May 2 17:02:51 2010
@@ -19,6 +19,7 @@
package org.apache.ode.scheduler.simple;
+
/**
* The thing that runs the scheduled tasks.
*
@@ -27,6 +28,5 @@ package org.apache.ode.scheduler.simple;
*/
interface TaskRunner {
-
public void runTask(Task task);
}
Added: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java (added)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/JobDAOImpl.java Sun May 2 17:02:51 2010
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple.jdbc;
+
+
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.dao.scheduler.JobDAO;
+import org.apache.ode.utils.GUID;
+
+/**
+ * Like a task, but a little bit better.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+class JobDAOImpl implements JobDAO {
+
+ long scheduledDate;
+ String jobId;
+ boolean transacted;
+ JobDetails detail;
+ boolean persisted = true;
+
+ public JobDAOImpl(long when, String jobId, boolean transacted, JobDetails jobDetail) {
+ this.scheduledDate = when;
+ this.jobId = jobId;
+ this.detail = jobDetail;
+ this.transacted = transacted;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public boolean isTransacted() {
+ return transacted;
+ }
+
+ public JobDetails getDetails() {
+ return detail;
+ }
+
+ public boolean isPersisted() {
+ return persisted;
+ }
+
+ public long getScheduledDate() {
+ return scheduledDate;
+ }
+
+}
Added: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java (added)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionFactoryImpl.java Sun May 2 17:02:51 2010
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.scheduler.simple.jdbc;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnectionFactory;
+
+
+public class SchedulerDAOConnectionFactoryImpl implements SchedulerDAOConnectionFactory {
+ static ThreadLocal<SchedulerDAOConnection> _connections = new ThreadLocal<SchedulerDAOConnection>();
+ DataSource _ds;
+ TransactionManager _txm;
+ AtomicBoolean _active = new AtomicBoolean(true);
+
+ public void init(Properties odeConfig, TransactionManager mgr, Object env) {
+ _ds = (DataSource) env;
+ _txm = mgr;
+ }
+
+ public SchedulerDAOConnection getConnection() {
+ if (_connections.get()==null || _connections.get().isClosed() ){
+ _connections.set(new SchedulerDAOConnectionImpl(_active,_ds,_txm));
+ }
+ return _connections.get();
+
+ }
+
+ public void shutdown() {
+ _active.set(false);
+ }
+}
Added: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java?rev=940263&view=auto
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java (added)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/jdbc/SchedulerDAOConnectionImpl.java Sun May 2 17:02:51 2010
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.scheduler.simple.jdbc;
+
+import org.apache.ode.dao.scheduler.JobDAO;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.sql.DataSource;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.utils.DbIsolation;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.dao.scheduler.SchedulerDAOConnection;
+import org.apache.ode.utils.GUID;
+import org.apache.ode.utils.StreamUtils;
+
+/**
+ * JDBC-based implementation of the {@link DatabaseDelegate} interface. Should work with most
+ * reasonably behaved databases.
+ *
+ * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
+ */
+public class SchedulerDAOConnectionImpl implements SchedulerDAOConnection {
+
+ private static final Log __log = LogFactory.getLog(SchedulerDAOConnectionImpl.class);
+
+ private static final String DELETE_JOB = "delete from ODE_JOB where jobid = ? and nodeid = ?";
+
+ private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid = ?, scheduled = 0 where nodeid = ?";
+
+ private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0 "
+ + "and mod(ts,?) = ? and ts < ?";
+
+ private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set nodeid = ? where nodeid is null and scheduled = 0 "
+ + "and (ts % ?) = ? and ts < ?";
+
+ private static final String SAVE_JOB = "insert into ODE_JOB "
+ + " (jobid, nodeid, ts, scheduled, transacted, "
+ + "instanceId,"
+ + "mexId,"
+ + "processId,"
+ + "type,"
+ + "channel,"
+ + "correlatorId,"
+ + "correlationKey,"
+ + "retryCount,"
+ + "inMem,"
+ + "detailsExt"
+ + ") values(?, ?, ?, ?, ?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?,"
+ + "?"
+ + ")";
+
+ private static final String GET_NODEIDS = "select distinct nodeid from ODE_JOB";
+
+ private static final String SCHEDULE_IMMEDIATE = "select jobid, ts, transacted, scheduled, "
+ + "instanceId,"
+ + "mexId,"
+ + "processId,"
+ + "type,"
+ + "channel,"
+ + "correlatorId,"
+ + "correlationKey,"
+ + "retryCount,"
+ + "inMem,"
+ + "detailsExt"
+ + " from ODE_JOB "
+ + "where nodeid = ? and scheduled = 0 and ts < ? order by ts";
+
+// public Long instanceId;
+// public String mexId;
+// public String processId;
+// public String type;
+// public String channel;
+// public String correlatorId;
+// public String correlationKey;
+// public Integer retryCount;
+// public Boolean inMem;
+// public Map<String, Object> detailsExt = new HashMap<String, Object>();
+
+ private static final String UPDATE_SCHEDULED = "update ODE_JOB set scheduled = 1 where jobid in (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+ private static final int UPDATE_SCHEDULED_SLOTS = 10;
+
+ private DataSource _ds;
+
+ private TransactionManager _txm;
+
+ private AtomicBoolean _active;
+
+ private List<Runnable> _onCommits = new ArrayList<Runnable>();
+
+ private Dialect _dialect;
+
+ public SchedulerDAOConnectionImpl(AtomicBoolean active, DataSource ds, TransactionManager txm) {
+ _active = active;
+ _ds = ds;
+ _txm = txm;
+ _dialect = guessDialect();
+ }
+
+ public boolean deleteJob(String jobid, String nodeId) {
+ if (__log.isDebugEnabled())
+ __log.debug("deleteJob " + jobid + " on node " + nodeId);
+
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ ps = con.prepareStatement(DELETE_JOB);
+ ps.setString(1, jobid);
+ ps.setString(2, nodeId);
+ return ps.executeUpdate() == 1;
+ } catch (SQLException se) {
+ __log.error("Error deleting job "+ jobid,se);
+ return false;
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
+ public List<String> getNodeIds() {
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ ps = con.prepareStatement(GET_NODEIDS, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ ResultSet rs = ps.executeQuery();
+ ArrayList<String> nodes = new ArrayList<String>();
+ while (rs.next()) {
+ String nodeId = rs.getString(1);
+ if (nodeId != null)
+ nodes.add(rs.getString(1));
+ }
+ if (__log.isDebugEnabled())
+ __log.debug("getNodeIds: " + nodes);
+ return nodes;
+ } catch (SQLException se) {
+ __log.error("Error getting Node IDs ",se);
+ return null;
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
+ public boolean insertJob(JobDAO job, String nodeId, boolean loaded) {
+ if (__log.isDebugEnabled())
+ __log.debug("insertJob " + job.getJobId() + " on node " + nodeId + " loaded=" + loaded);
+
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ int i = 1;
+ con = getConnection();
+ ps = con.prepareStatement(SAVE_JOB);
+ ps.setString(i++, job.getJobId());
+ ps.setString(i++, nodeId);
+ ps.setLong(i++, job.getScheduledDate());
+ ps.setInt(i++, asInteger(loaded));
+ ps.setInt(i++, asInteger(job.isTransacted()));
+
+ JobDetails details = job.getDetails();
+ ps.setObject(i++, details.instanceId, Types.BIGINT);
+ ps.setObject(i++, details.mexId, Types.VARCHAR);
+ ps.setObject(i++, details.processId, Types.VARCHAR);
+ ps.setObject(i++, details.type, Types.VARCHAR);
+ ps.setObject(i++, details.channel, Types.VARCHAR);
+ ps.setObject(i++, details.correlatorId, Types.VARCHAR);
+ ps.setObject(i++, details.correlationKey, Types.VARCHAR);
+ ps.setObject(i++, details.retryCount, Types.INTEGER);
+ ps.setObject(i++, details.inMem, Types.INTEGER);
+
+ if (details.detailsExt == null || details.detailsExt.size() == 0) {
+ ps.setObject(i++, null, Types.BLOB);
+ } else {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ StreamUtils.write(bos, (Serializable) details.detailsExt);
+ } catch (Exception ex) {
+ __log.error("Error serializing job detail: " + job.getDetails());
+ return false;
+ }
+ ps.setBytes(i++, bos.toByteArray());
+ }
+
+ return ps.executeUpdate() == 1;
+ } catch (SQLException se) {
+ __log.error("Error inserting Job " + job.getJobId(),se);
+ return false;
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<JobDAO> dequeueImmediate(String nodeId, long maxtime, int maxjobs) {
+ ArrayList<JobDAO> ret = new ArrayList<JobDAO>(maxjobs);
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ ps = con.prepareStatement(SCHEDULE_IMMEDIATE);
+ ps.setString(1, nodeId);
+ ps.setLong(2, maxtime);
+ ps.setMaxRows(maxjobs);
+
+ ResultSet rs = ps.executeQuery();
+ while (rs.next()) {
+ Scheduler.JobDetails details = new Scheduler.JobDetails();
+ details.instanceId = (Long) rs.getObject("instanceId");
+ details.mexId = (String) rs.getObject("mexId");
+ details.processId = (String) rs.getObject("processId");
+ details.type = (String) rs.getObject("type");
+ details.channel = (String) rs.getObject("channel");
+ details.correlatorId = (String) rs.getObject("correlatorId");
+ details.correlationKey = (String) rs.getObject("correlationKey");
+ details.retryCount = (Integer) rs.getObject("retryCount");
+ details.inMem = (Boolean) rs.getObject("inMem");
+ if (rs.getObject("detailsExt") != null) {
+ try {
+ ObjectInputStream is = new ObjectInputStream(rs.getBinaryStream("detailsExt"));
+ details.detailsExt = (Map<String, Object>) is.readObject();
+ is.close();
+ } catch (Exception e) {
+ __log.error("Error in dequeueImmediate ",e);
+ return null;
+ }
+ }
+
+ {
+ //For compatibility reasons, we check whether there are entries inside
+ //jobDetailsExt blob, which correspond to extracted entries. If so, we
+ //use them.
+
+ Map<String, Object> detailsExt = details.getDetailsExt();
+ if (detailsExt.get("type") != null) {
+ details.type = (String) detailsExt.get("type");
+ }
+ if (detailsExt.get("iid") != null) {
+ details.instanceId = (Long) detailsExt.get("iid");
+ }
+ if (detailsExt.get("pid") != null) {
+ details.processId = (String) detailsExt.get("pid");
+ }
+ if (detailsExt.get("inmem") != null) {
+ details.inMem = (Boolean) detailsExt.get("inmem");
+ }
+ if (detailsExt.get("ckey") != null) {
+ details.correlationKey = (String) detailsExt.get("ckey");
+ }
+ if (detailsExt.get("channel") != null) {
+ details.channel = (String) detailsExt.get("channel");
+ }
+ if (detailsExt.get("mexid") != null) {
+ details.mexId = (String) detailsExt.get("mexid");
+ }
+ if (detailsExt.get("correlatorId") != null) {
+ details.correlatorId = (String) detailsExt.get("correlatorId");
+ }
+ if (detailsExt.get("retryCount") != null) {
+ details.retryCount = Integer.parseInt((String) detailsExt.get("retryCount"));
+ }
+ }
+
+ JobDAO job = new JobDAOImpl(rs.getLong(2), rs.getString(1), asBoolean(rs.getInt(3)), details);
+ ret.add(job);
+ }
+ rs.close();
+ ps.close();
+
+ // mark jobs as scheduled, UPDATE_SCHEDULED_SLOTS at a time
+ int j = 0;
+ int updateCount = 0;
+ ps = con.prepareStatement(UPDATE_SCHEDULED);
+ for (int updates = 1; updates <= (ret.size() / UPDATE_SCHEDULED_SLOTS) + 1; updates++) {
+ for (int i = 1; i <= UPDATE_SCHEDULED_SLOTS; i++) {
+ ps.setString(i, j < ret.size() ? ret.get(j).getJobId() : "");
+ j++;
+ }
+ ps.execute();
+ updateCount += ps.getUpdateCount();
+ }
+ if (updateCount != ret.size()) {
+ __log.error("Updating scheduled jobs failed to update all jobs; expected=" + ret.size()
+ + " actual=" + updateCount);
+ return null;
+
+ }
+ } catch (SQLException se) {
+ __log.error("SQL error in dequeueImmediate",se);
+ return null;
+ } finally {
+ close(ps);
+ close(con);
+ }
+ return ret;
+ }
+
+ public int updateReassign(String oldnode, String newnode) {
+ if (__log.isDebugEnabled())
+ __log.debug("updateReassign from " + oldnode + " ---> " + newnode);
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ ps = con.prepareStatement(UPDATE_REASSIGN);
+ ps.setString(1, newnode);
+ ps.setString(2, oldnode);
+ return ps.executeUpdate();
+ } catch (SQLException se) {
+ __log.error("Error in updateReassign",se);
+ return -1;
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
+ public int updateAssignToNode(String node, int i, int numNodes, long maxtime) {
+ if (__log.isDebugEnabled())
+ __log.debug("updateAssignToNode node=" + node + " " + i + "/" + numNodes + " maxtime=" + maxtime);
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ if (_dialect == Dialect.SQLSERVER) {
+ ps = con.prepareStatement(UPGRADE_JOB_SQLSERVER);
+ } else {
+ ps = con.prepareStatement(UPGRADE_JOB_DEFAULT);
+ }
+ ps.setString(1, node);
+ ps.setInt(2, numNodes);
+ ps.setInt(3, i);
+ ps.setLong(4, maxtime);
+ return ps.executeUpdate();
+ } catch (SQLException se) {
+ __log.error("SQL error in updateAssignToNode",se);
+ return -1;
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
+ private Connection getConnection() throws SQLException {
+ Connection c = _ds.getConnection();
+ DbIsolation.setIsolationLevel(c);
+ return c;
+ }
+
+ private int asInteger(boolean value) {
+ return (value ? 1 : 0);
+ }
+
+ private boolean asBoolean(int value) {
+ return (value != 0);
+ }
+
+ private void close(PreparedStatement ps) {
+ if (ps != null) {
+ try {
+ ps.close();
+ } catch (Exception e) {
+ __log.warn("Exception while closing prepared statement", e);
+ }
+ }
+ }
+
+ private void close(Connection con) {
+ if (con != null) {
+ try {
+ con.close();
+ } catch (Exception e) {
+ __log.warn("Exception while closing connection", e);
+ }
+ }
+ }
+
+ private Dialect guessDialect() {
+ Dialect d = Dialect.UNKNOWN;
+ Connection con = null;
+ try {
+ con = getConnection();
+ DatabaseMetaData metaData = con.getMetaData();
+ if (metaData != null) {
+ String dbProductName = metaData.getDatabaseProductName();
+ int dbMajorVer = metaData.getDatabaseMajorVersion();
+ __log.debug("Using database " + dbProductName + " major version " + dbMajorVer);
+ if (dbProductName.indexOf("DB2") >= 0) {
+ d = Dialect.DB2;
+ } else if (dbProductName.indexOf("Derby") >= 0) {
+ d = Dialect.DERBY;
+ } else if (dbProductName.indexOf("Firebird") >= 0) {
+ d = Dialect.FIREBIRD;
+ } else if (dbProductName.indexOf("HSQL") >= 0) {
+ d = Dialect.HSQL;
+ } else if (dbProductName.indexOf("Microsoft SQL") >= 0) {
+ d = Dialect.SQLSERVER;
+ } else if (dbProductName.indexOf("MySQL") >= 0) {
+ d = Dialect.MYSQL;
+ } else if (dbProductName.indexOf("Sybase") >= 0) {
+ d = Dialect.SYBASE;
+ }
+ }
+ } catch (SQLException e) {
+ __log.warn("Unable to determine database dialect", e);
+ } finally {
+ close(con);
+ }
+ __log.debug("Using database dialect: " + d);
+ return d;
+ }
+
+ public JobDAO createJob(boolean transacted, JobDetails jobDetails, boolean persisted, long scheduledDate) {
+ return new JobDAOImpl(scheduledDate, new GUID().toString(), transacted, jobDetails);
+ }
+
+ public void close() {
+
+ }
+
+ public boolean isClosed() {
+ return !_active.get();
+ }
+
+
+ enum Dialect {
+ DB2, DERBY, FIREBIRD, HSQL, MYSQL, ORACLE, SQLSERVER, SYBASE, UNKNOWN
+ }
+
+}