You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/01/25 19:44:42 UTC
svn commit: r499891 -
/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
Author: mszefler
Date: Thu Jan 25 10:44:41 2007
New Revision: 499891
URL: http://svn.apache.org/viewvc?view=rev&rev=499891
Log:
Instance locking for scheduled jobs.
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=499891&r1=499890&r2=499891
==============================================================================
--- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original)
+++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Thu Jan 25 10:44:41 2007
@@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import javax.wsdl.Operation;
import javax.wsdl.PortType;
@@ -54,6 +55,8 @@
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.utils.msg.MessageBundle;
+import com.sun.corba.se.impl.orbutil.threadpool.TimeoutException;
+
/**
* Implementation of the {@link BpelEngine} interface: provides the server methods that should be invoked in the context of a
* transaction.
@@ -96,6 +99,9 @@
/** Mapping from myrole endpoint name to active process. */
private final HashMap<Endpoint, BpelProcess> _serviceMap = new HashMap<Endpoint, BpelProcess>();
+ /** Manage instance-level locks. */
+ private final InstanceLockManager _instanceLockManager = new InstanceLockManager();
+
final Contexts _contexts;
public BpelEngineImpl(Contexts contexts) {
@@ -242,9 +248,29 @@
public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
WorkEvent we = new WorkEvent(jobInfo.jobDetail);
- // NOTE: wrap this method real tight in a try/catch block, we need to handle all types of
- // failure here, the scheduler is not going to know how to handle our errors.
+ // We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks,
+ // Note that we don't want to wait too long here to get our lock, since we are likely holding
+ // on to scheduler's locks of various sorts.
+ try {
+ _instanceLockManager.lock(we.getIID(), 1, TimeUnit.MICROSECONDS);
+ } catch (InterruptedException e) {
+ // Retry later.
+ __log.debug("Thread interrupted, job will be rescheduled: " + jobInfo);
+ throw new Scheduler.JobProcessorException(true);
+ } catch (org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {
+ __log.debug("Instance " + we.getIID() + " is busy, rescheduling job.");
+ // TODO: This should really be more of something like the exponential backoff algorithm in
+ // ethernet.
+ _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail,
+ new Date(System.currentTimeMillis() + Math.min(randomExp(1000),10000)));
+ return;
+ }
+ // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle
+ // all types of failure here, the scheduler is not going to know how to handle our errors,
+ // ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come
+ // to a grinding halt.
try {
+
ProcessInstanceDAO instance;
if (we.isInMem())
instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());
@@ -287,6 +313,8 @@
__log.error(__msgs.msgScheduledJobFailed(we.getDetail()),t);
throw new Scheduler.JobProcessorException(false);
+ } finally {
+ _instanceLockManager.unlock(we.getIID());
}
}
@@ -318,8 +346,7 @@
// Do a delay for debugging purposes.
if (_delayMean != 0)
try {
- double u = _random.nextDouble(); // Uniform
- long delay = (long) (-Math.log(u) * _delayMean); // Exponential
+ long delay = randomExp(_delayMean);
// distribution
// with mean
// _delayMean
@@ -330,6 +357,11 @@
}
}
+ private long randomExp(double mean) {
+ double u = _random.nextDouble(); // Uniform
+ long delay = (long) (-Math.log(u) * mean); // Exponential
+ return delay;
+ }
void fireEvent(BpelEvent event) {
// Note that the eventListeners list is a copy-on-write array, so need
// to mess with synchronization.