You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/04/10 02:18:06 UTC
svn commit: r646601 - in /ode/branches/APACHE_ODE_1.1/scheduler-simple/src:
main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
test/java/org/apache/ode/scheduler/simple/RetriesTest.java
Author: mriou
Date: Wed Apr 9 17:18:01 2008
New Revision: 646601
URL: http://svn.apache.org/viewvc?rev=646601&view=rev
Log:
Implemented retry in simple scheduler.
Added:
ode/branches/APACHE_ODE_1.1/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
Modified:
ode/branches/APACHE_ODE_1.1/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Modified: ode/branches/APACHE_ODE_1.1/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.1/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=646601&r1=646600&r2=646601&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.1/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.1/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Wed Apr 9 17:18:01 2008
@@ -19,12 +19,7 @@
package org.apache.ode.scheduler.simple;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -71,7 +66,7 @@
* Jobs scheduled with a time that is between [now, now+immediateInterval] will be assigned to the current node, and placed
* directly on the todo queue.
*/
- long _immediateInterval = 60000;
+ long _immediateInterval = 30000;
/**
* Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
@@ -330,7 +325,8 @@
* job to run.
*/
protected void runJob(final Job job) {
- final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail, 0);
+ final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail,
+ (Integer)(job.detail.get("retry") != null ? job.detail.get("retry") : 0));
_exec.submit(new Callable<Void>() {
public Void call() throws Exception {
@@ -349,6 +345,13 @@
// This may happen if two node try to do the same job... we try to avoid
// it the synchronization is a best-effort but not perfect.
__log.debug("job no longer in db forced rollback.");
+ } catch (JobProcessorException jpe) {
+ if (jpe.retry) {
+ __log.error("Error while processing transaction, retrying.", jpe);
+ doRetry(job);
+ } else {
+ __log.error("Error while processing transaction, no retry.", jpe);
+ }
} catch (Exception ex) {
__log.error("Error while executing transaction", ex);
}
@@ -497,6 +500,14 @@
}
+ private void doRetry(Job job) throws DatabaseException {
+ Calendar retryTime = Calendar.getInstance();
+ retryTime.add(Calendar.SECOND, 2);
+ job.detail.put("retry", job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 1);
+ Job jobRetry = new Job(retryTime.getTime().getTime(), true, job.detail);
+ _db.insertJob(jobRetry, _nodeId, false);
+ }
+
private abstract class SchedulerTask extends Task implements Runnable {
SchedulerTask(long schedDate) {
super(schedDate);
@@ -551,7 +562,7 @@
try {
success = doUpgrade();
} finally {
- long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .75) : 100);
+ long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100);
_nextUpgrade.set(future);
_todo.enqueue(new UpgradeJobsTask(future));
__log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms");
Added: ode/branches/APACHE_ODE_1.1/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.1/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=646601&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.1/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java (added)
+++ ode/branches/APACHE_ODE_1.1/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java Wed Apr 9 17:18:01 2008
@@ -0,0 +1,73 @@
+package org.apache.ode.scheduler.simple;
+
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
+
+import javax.transaction.TransactionManager;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Date;
+
+import junit.framework.TestCase;
+
+/**
+ * @author Matthieu Riou <mr...@apache.org>
+ */
+public class RetriesTest extends TestCase implements Scheduler.JobProcessor {
+ DelegateSupport _ds;
+ SimpleScheduler _scheduler;
+ ArrayList<Scheduler.JobInfo> _jobs;
+ ArrayList<Scheduler.JobInfo> _commit;
+ TransactionManager _txm;
+ int _tried = 0;
+
+ public void setUp() throws Exception {
+ _txm = new GeronimoTransactionManager();
+ _ds = new DelegateSupport();
+
+ _scheduler = newScheduler("n1");
+ _jobs = new ArrayList<Scheduler.JobInfo>(100);
+ _commit = new ArrayList<Scheduler.JobInfo>(100);
+ }
+
+ public void tearDown() throws Exception {
+ _scheduler.shutdown();
+ }
+
+ public void testRetries() throws Exception {
+ // speed things up a bit to hit the right code paths
+ _scheduler.setNearFutureInterval(5000);
+ _scheduler.setImmediateInterval(1000);
+ _scheduler.start();
+ _txm.begin();
+ try {
+ _scheduler.schedulePersistedJob(newDetail("123"), new Date());
+ } finally {
+ _txm.commit();
+ }
+
+ Thread.sleep(5000);
+ assertEquals(3, _tried);
+ }
+
+
+ public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
+ _tried++;
+ throw new Scheduler.JobProcessorException(jobInfo.retryCount < 2);
+ }
+
+ Map<String, Object> newDetail(String x) {
+ HashMap<String, Object> det = new HashMap<String, Object>();
+ det.put("foo", x);
+ return det;
+ }
+
+ private SimpleScheduler newScheduler(String nodeId) {
+ SimpleScheduler scheduler = new SimpleScheduler(nodeId, _ds.delegate());
+ scheduler.setJobProcessor(this);
+ scheduler.setTransactionManager(_txm);
+ return scheduler;
+ }
+
+}