You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/04/10 02:18:06 UTC

svn commit: r646601 - in /ode/branches/APACHE_ODE_1.1/scheduler-simple/src: main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java test/java/org/apache/ode/scheduler/simple/RetriesTest.java

Author: mriou
Date: Wed Apr  9 17:18:01 2008
New Revision: 646601

URL: http://svn.apache.org/viewvc?rev=646601&view=rev
Log:
Implemented retry in simple scheduler.

Added:
    ode/branches/APACHE_ODE_1.1/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
Modified:
    ode/branches/APACHE_ODE_1.1/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Modified: ode/branches/APACHE_ODE_1.1/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.1/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=646601&r1=646600&r2=646601&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.1/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.1/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Wed Apr  9 17:18:01 2008
@@ -19,12 +19,7 @@
 
 package org.apache.ode.scheduler.simple;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
@@ -71,7 +66,7 @@
      * Jobs scheduled with a time that is between [now, now+immediateInterval] will be assigned to the current node, and placed
      * directly on the todo queue.
      */
-    long _immediateInterval = 60000;
+    long _immediateInterval = 30000;
 
     /**
      * Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
@@ -330,7 +325,8 @@
      *            job to run.
      */
     protected void runJob(final Job job) {
-        final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail, 0);
+        final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail,
+                (Integer)(job.detail.get("retry") != null ? job.detail.get("retry") : 0));
 
         _exec.submit(new Callable<Void>() {
             public Void call() throws Exception {
@@ -349,6 +345,13 @@
                         // This may happen if two node try to do the same job... we try to avoid
                         // it the synchronization is a best-effort but not perfect.
                         __log.debug("job no longer in db forced rollback.");
+                    } catch (JobProcessorException jpe) {
+                        if (jpe.retry) {
+                            __log.error("Error while processing transaction, retrying.", jpe);
+                            doRetry(job);
+                        } else {
+                            __log.error("Error while processing transaction, no retry.", jpe);
+                        }
                     } catch (Exception ex) {
                         __log.error("Error while executing transaction", ex);
                     }
@@ -497,6 +500,14 @@
 
     }
 
+    private void doRetry(Job job) throws DatabaseException {
+        Calendar retryTime = Calendar.getInstance();
+        retryTime.add(Calendar.SECOND, 2);
+        job.detail.put("retry", job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 1);
+        Job jobRetry = new Job(retryTime.getTime().getTime(), true, job.detail);
+        _db.insertJob(jobRetry, _nodeId, false);
+    }
+
     private abstract class SchedulerTask extends Task implements Runnable {
         SchedulerTask(long schedDate) {
             super(schedDate);
@@ -551,7 +562,7 @@
             try {
                 success = doUpgrade();
             } finally {
-                long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .75) : 100);
+                long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100);
                 _nextUpgrade.set(future);
                 _todo.enqueue(new UpgradeJobsTask(future));
                 __log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms");

Added: ode/branches/APACHE_ODE_1.1/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.1/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=646601&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.1/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java (added)
+++ ode/branches/APACHE_ODE_1.1/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java Wed Apr  9 17:18:01 2008
@@ -0,0 +1,73 @@
+package org.apache.ode.scheduler.simple;
+
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
+
+import javax.transaction.TransactionManager;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Date;
+
+import junit.framework.TestCase;
+
+/**
+ * @author Matthieu Riou <mr...@apache.org>
+ */
+public class RetriesTest extends TestCase implements Scheduler.JobProcessor {
+    DelegateSupport _ds;
+    SimpleScheduler _scheduler;
+    ArrayList<Scheduler.JobInfo> _jobs;
+    ArrayList<Scheduler.JobInfo> _commit;
+    TransactionManager _txm;
+    int _tried = 0;
+
+    public void setUp() throws Exception {
+        _txm = new GeronimoTransactionManager();
+        _ds = new DelegateSupport();
+
+        _scheduler = newScheduler("n1");
+        _jobs = new ArrayList<Scheduler.JobInfo>(100);
+        _commit = new ArrayList<Scheduler.JobInfo>(100);
+    }
+
+    public void tearDown() throws Exception {
+        _scheduler.shutdown();
+    }
+    
+    public void testRetries() throws Exception {
+        // speed things up a bit to hit the right code paths
+        _scheduler.setNearFutureInterval(5000);
+        _scheduler.setImmediateInterval(1000);
+        _scheduler.start();
+        _txm.begin();
+        try {
+            _scheduler.schedulePersistedJob(newDetail("123"), new Date());
+        } finally {
+            _txm.commit();
+        }
+
+        Thread.sleep(5000);
+        assertEquals(3, _tried);
+    }
+
+
+    public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
+        _tried++;
+        throw new Scheduler.JobProcessorException(jobInfo.retryCount < 2);
+    }
+
+    Map<String, Object> newDetail(String x) {
+        HashMap<String, Object> det = new HashMap<String, Object>();
+        det.put("foo", x);
+        return det;
+    }
+
+    private SimpleScheduler newScheduler(String nodeId) {
+        SimpleScheduler scheduler = new SimpleScheduler(nodeId, _ds.delegate());
+        scheduler.setJobProcessor(this);
+        scheduler.setTransactionManager(_txm);
+        return scheduler;
+    }
+
+}