You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/01/25 19:44:42 UTC

svn commit: r499891 - /incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java

Author: mszefler
Date: Thu Jan 25 10:44:41 2007
New Revision: 499891

URL: http://svn.apache.org/viewvc?view=rev&rev=499891
Log:
Instance locking for scheduled jobs.

Modified:
    incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java

Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=499891&r1=499890&r2=499891
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Thu Jan 25 10:44:41 2007
@@ -26,6 +26,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import javax.wsdl.Operation;
 import javax.wsdl.PortType;
@@ -54,6 +55,8 @@
 import org.apache.ode.bpel.o.OProcess;
 import org.apache.ode.utils.msg.MessageBundle;
 
+import com.sun.corba.se.impl.orbutil.threadpool.TimeoutException;
+
 /**
  * Implementation of the {@link BpelEngine} interface: provides the server methods that should be invoked in the context of a
  * transaction.
@@ -96,6 +99,9 @@
     /** Mapping from myrole endpoint name to active process. */
     private final HashMap<Endpoint, BpelProcess> _serviceMap = new HashMap<Endpoint, BpelProcess>();
 
+    /** Manage instance-level locks. */
+    private final InstanceLockManager _instanceLockManager = new InstanceLockManager();
+    
     final Contexts _contexts;
 
     public BpelEngineImpl(Contexts contexts) {
@@ -242,9 +248,29 @@
     public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
         WorkEvent we = new WorkEvent(jobInfo.jobDetail);
 
-        // NOTE: wrap this method real tight in a try/catch block, we need to handle all types of 
-        // failure here, the scheduler is not going to know how to handle our errors. 
+        // We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks,
+        // Note that we don't want to wait too long here to get our lock, since we are likely holding 
+        // on to scheduler's locks of various sorts. 
+        try {
+            _instanceLockManager.lock(we.getIID(), 1, TimeUnit.MICROSECONDS);
+        } catch (InterruptedException e) {
+            // Retry later.
+            __log.debug("Thread interrupted, job will be rescheduled: " + jobInfo);
+            throw new Scheduler.JobProcessorException(true);
+        } catch (org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {
+            __log.debug("Instance " + we.getIID() + " is busy, rescheduling job.");
+            // TODO: This should really be more of something like the exponential backoff algorithm in
+            // ethernet. 
+            _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, 
+                    new Date(System.currentTimeMillis() + Math.min(randomExp(1000),10000)));
+            return;
+        }
+        // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle 
+        // all types of failure here, the scheduler is not going to know how to handle our errors, 
+        // ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come 
+        // to a grinding halt.
         try {
+            
             ProcessInstanceDAO instance;
             if (we.isInMem())
                 instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());
@@ -287,6 +313,8 @@
             __log.error(__msgs.msgScheduledJobFailed(we.getDetail()),t);
             throw new Scheduler.JobProcessorException(false);
 
+        } finally {
+            _instanceLockManager.unlock(we.getIID());
         }
     }
 
@@ -318,8 +346,7 @@
         // Do a delay for debugging purposes.
         if (_delayMean != 0)
             try {
-                double u = _random.nextDouble(); // Uniform
-                long delay = (long) (-Math.log(u) * _delayMean); // Exponential
+                long delay = randomExp(_delayMean);
                 // distribution
                 // with mean
                 // _delayMean
@@ -330,6 +357,11 @@
             }
     }
 
+    private long randomExp(double mean) {
+        double u = _random.nextDouble(); // Uniform
+        long delay = (long) (-Math.log(u) * mean); // Exponential
+        return delay;
+    }
     void fireEvent(BpelEvent event) {
         // Note that the eventListeners list is a copy-on-write array, so need
         // to mess with synchronization.