You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/07/03 23:00:09 UTC

svn commit: r552978 - in /incubator/ode/branches/bart: axis2/src/main/java/org/apache/ode/axis2/ bpel-dao/src/main/java/org/apache/ode/bpel/dao/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ jbi/src/main/java/org/apache/ode/jbi/

Author: mszefler
Date: Tue Jul  3 14:00:06 2007
New Revision: 552978

URL: http://svn.apache.org/viewvc?view=rev&rev=552978
Log:
BART: BpelEngineImpl refactor/eleimination

Modified:
    incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
    incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
    incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java

Modified: incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java (original)
+++ incubator/ode/branches/bart/axis2/src/main/java/org/apache/ode/axis2/MessageExchangeContextImpl.java Tue Jul  3 14:00:06 2007
@@ -40,7 +40,12 @@
     public MessageExchangeContextImpl(ODEServer server) {
     }
 
-    public void invokePartner(PartnerRoleMessageExchange partnerRoleMessageExchange) throws ContextException {
+
+    public void invokePartnerAsynch(PartnerRoleMessageExchange mex) throws ContextException {
+        throw new UnsupportedOperationException();
+    }
+
+    public void invokePartnerBlocking(PartnerRoleMessageExchange partnerRoleMessageExchange) throws ContextException {
         if (__log.isDebugEnabled())
             __log.debug("Invoking a partner operation: " + partnerRoleMessageExchange.getOperationName());
 
@@ -48,14 +53,29 @@
         if (__log.isDebugEnabled())
             __log.debug("The service to invoke is the external service " + service);
         service.invoke(partnerRoleMessageExchange);
+        
+    }
+
+    public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
+        throw new UnsupportedOperationException();
     }
 
+    public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
+        throw new UnsupportedOperationException();        
+    }
+
+    
+
     public void onAsyncReply(MyRoleMessageExchange myRoleMessageExchange) throws BpelEngineException {
         if (__log.isDebugEnabled())
             __log.debug("Processing an async reply from service " + myRoleMessageExchange.getServiceName());
 
         // Nothing to do, no callback is necessary, the client just synchornizes itself with the
         // mex reply when invoking the engine.
+    }
+
+    public void onReliableReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException {
+        // We don't support this yet, so not much to do here. 
     }
 
 }

Modified: incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java (original)
+++ incubator/ode/branches/bart/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java Tue Jul  3 14:00:06 2007
@@ -260,4 +260,4 @@
 
     void setFailureType(String failureType);
 
-}
+ }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java Tue Jul  3 14:00:06 2007
@@ -23,7 +23,7 @@
 
     ResponseFuture _future;
     
-    public AsyncMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+    public AsyncMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) {
         super(engine, mexId);
     }
 
@@ -33,7 +33,7 @@
         
         _future = new ResponseFuture();
 
-        BpelProcess target = _engine.route(_callee, _request);
+        BpelProcess target = _server.route(_callee, _request);
         if (target == null) {
             if (__log.isWarnEnabled())
                 __log.warn(__msgs.msgUnknownEPR("" + _epr));

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java Tue Jul  3 14:00:06 2007
@@ -16,7 +16,7 @@
     Future<Status> _future;
     boolean _done = false;
     
-    public BlockingMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+    public BlockingMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) {
         super(engine, mexId);
     }
 

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Tue Jul  3 14:00:06 2007
@@ -8,7 +8,7 @@
  * with the License.  You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ 
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,10 +65,6 @@
 class BpelEngineImpl {
     private static final Log __log = LogFactory.getLog(BpelEngineImpl.class);
 
-    /** RNG, for delays */
-    private Random _random = new Random(System.currentTimeMillis());
-
-    private static double _delayMean = 0;
 
     private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
 
@@ -81,18 +77,7 @@
         _contexts = contexts;
     }
 
-     OProcess getOProcess(QName processId) {
-        BpelProcess process = _activeProcesses.get(processId);
-
-        if (process == null)
-            return null;
-
-        return process.getOProcess();
-    }
 
-    public void processJob(WorkEvent we) throws BpelEngineException {
-        }
-    }
 
     private boolean checkRetry(final JobInfo jobInfo, Throwable t) {
         // TODO, better handling of failed jobs (put them in the DB perhaps?)
@@ -138,39 +123,6 @@
 
         // No more retries.
         return false;
-    }
-
-    /**
-     * Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially
-     * distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable.
-     */
-    private void debuggingDelay() {
-        // Do a delay for debugging purposes.
-        if (_delayMean != 0)
-            try {
-                long delay = randomExp(_delayMean);
-                // distribution
-                // with mean
-                // _delayMean
-                __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms.");
-                Thread.sleep(delay);
-            } catch (InterruptedException e) {
-                ; // ignore
-            }
-    }
-
-    private long randomExp(double mean) {
-        double u = _random.nextDouble(); // Uniform
-        long delay = (long) (-Math.log(u) * mean); // Exponential
-        return delay;
-    }
-
-    void fireEvent(BpelEvent event) {
-        // Note that the eventListeners list is a copy-on-write array, so need
-        // to mess with synchronization.
-        for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) {
-            l.onEvent(event);
-        }
     }
 
     /**

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Tue Jul  3 14:00:06 2007
@@ -19,11 +19,14 @@
 package org.apache.ode.bpel.engine;
 
 import java.io.InputStream;
+import java.sql.Date;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -44,7 +47,6 @@
 import org.apache.ode.bpel.explang.ConfigurationException;
 import org.apache.ode.bpel.explang.EvaluationException;
 import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.InvocationStyle;
@@ -53,8 +55,10 @@
 import org.apache.ode.bpel.iapi.ProcessConf;
 import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
+import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
 import org.apache.ode.bpel.o.OElementVarType;
 import org.apache.ode.bpel.o.OExpressionLanguage;
 import org.apache.ode.bpel.o.OMessageVarType;
@@ -68,6 +72,7 @@
 import org.apache.ode.jacob.soup.ReplacementMap;
 import org.apache.ode.utils.ObjectPrinter;
 import org.apache.ode.utils.msg.MessageBundle;
+import org.omg.CosNaming.IstringHelper;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
@@ -76,7 +81,7 @@
 /**
  * Entry point into the runtime of a BPEL process.
  * 
- * @author Maciej Szefler 
+ * @author Maciej Szefler
  * @author Matthieu Riou <mriou at apache dot org>
  */
 public class BpelProcess {
@@ -108,8 +113,6 @@
     /** Last time the process was used. */
     private volatile long _lastUsed;
 
-    BpelEngineImpl _engine;
-
     DebuggerSupport _debugger;
 
     ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry;
@@ -125,14 +128,33 @@
     private HydrationLatch _hydrationLatch;
 
     private Contexts _contexts;
-    
+
     /** Manage instance-level locks. */
     private final InstanceLockManager _instanceLockManager = new InstanceLockManager();
 
+    private final Set<InvocationStyle> _invocationStyles;
+
+    private BpelDAOConnectionFactoryImpl _inMemDao;
+
+    private Random _random = new Random();
+
+    private BpelServerImpl _server;
+
     public BpelProcess(ProcessConf conf, BpelEventListener debugger) {
         _pid = conf.getProcessId();
         _pconf = conf;
         _hydrationLatch = new HydrationLatch();
+        _inMemDao = new BpelDAOConnectionFactoryImpl(_contexts.scheduler);
+
+        // TODO : do this on a per-partnerlink basis, support transacted styles.
+        HashSet<InvocationStyle> istyles = new HashSet<InvocationStyle>();
+        istyles.add(InvocationStyle.BLOCKING);
+        if (!conf.isTransient()) {
+            istyles.add(InvocationStyle.ASYNC);
+            istyles.add(InvocationStyle.RELIABLE);
+        }
+
+        _invocationStyles = Collections.unmodifiableSet(istyles);
     }
 
     public String toString() {
@@ -159,41 +181,37 @@
      * 
      * @param mex
      */
-    void invokeProcess(MyRoleMessageExchangeImpl mex) {
+    void invokeProcess(MessageExchangeDAO mexdao) {
         _hydrationLatch.latch(1);
         try {
-            MessageExchangeDAO mexdao = getDAO(mex);
-
-            PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName());
+            PartnerLinkMyRoleImpl target = getMyRoleForService(mexdao.getCallee());
             if (target == null) {
-                String errmsg = __msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId());
+                String errmsg = __msgs.msgMyRoleRoutingFailure(mexdao.getMessageExchangeId());
                 __log.error(errmsg);
-                mex.setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, errmsg, null);
+                mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_ENDPOINT.toString());
+                mexdao.setFaultExplanation(errmsg);
+                mexdao.setStatus(Status.FAILURE.toString());
                 return;
             }
 
-            getDAO(mex).setProcess(getProcessDAO());
+            mexdao.setProcess(getProcessDAO());
 
-            if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
-                __log.debug("Aborting processing of mex " + mex + " due to interceptors.");
-                return;
-            }
+            // TODO: fix this
+            // if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
+            // __log.debug("Aborting processing of mex " + mex + " due to interceptors.");
+            // return;
+            // }
 
             markused();
-            target.invokeMyRole(mex);
-            markused();
+            target.invokeMyRole(mexdao);
         } finally {
             _hydrationLatch.release(1);
         }
 
-        // For a one way, once the engine is done, the mex can be safely released.
-        if (mex.getMessageExchangePattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
-            mex.release();
-        }
-    }
-
-    private MessageExchangeDAO getDAO(MyRoleMessageExchangeImpl mex) {
-
+        // TODO: relocate this code // For a one way, once the engine is done, the mex can be safely released.
+        // if (mex.getMessageExchangePattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
+        // mex.release();
+        // }
     }
 
     private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {
@@ -291,59 +309,40 @@
      * @return <code>true</code> if execution should continue, <code>false</code> otherwise
      */
     boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
-        InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(), _pconf);
-
-        for (MessageExchangeInterceptor i : _mexInterceptors)
-            if (!mex.processInterceptor(i, mex, ictx, invoker))
-                return false;
-        for (MessageExchangeInterceptor i : getEngine().getGlobalInterceptors())
-            if (!mex.processInterceptor(i, mex, ictx, invoker))
-                return false;
-
+        // InterceptorContextImpl ictx = new InterceptorContextImpl(_contexts.dao.getConnection(), getProcessDAO(), _pconf);
+        //
+        // for (MessageExchangeInterceptor i : _mexInterceptors)
+        // if (!mex.processInterceptor(i, mex, ictx, invoker))
+        // return false;
+        // for (MessageExchangeInterceptor i : getEngine().getGlobalInterceptors())
+        // if (!mex.processInterceptor(i, mex, ictx, invoker))
+        // return false;
+        //
         return true;
     }
 
-    
     /*
-
-        // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle
-        // all types of failure here, the scheduler is not going to know how to handle our errors,
-        // ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come
-        // to a grinding halt.
-        try {
-
-            ProcessInstanceDAO instance;
-            if (process.isInMemory())
-                instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());
-            else
-                instance = _contexts.dao.getConnection().getInstance(we.getIID());
-
-            if (instance == null) {
-                __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID()));
-                // nothing we can do, this instance is not in the database, it will
-                // always
-                // fail.
-                return;
-            }
-            ProcessDAO processDao = instance.getProcess();
-            process = _activeProcesses.get(processDao.getProcessId());
-
-            process.handleWorkEvent(we.getDetail());
-            debuggingDelay();
-        } catch (BpelEngineException bee) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee);
-            throw new Scheduler.JobProcessorException(bee, checkRetry(jobInfo, bee));
-        } catch (ContextException ce) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce);
-            throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce));
-        } catch (RuntimeException rte) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte);
-            throw new Scheduler.JobProcessorException(rte, checkRetry(jobInfo, rte));
-        } catch (Throwable t) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t);
-            throw new Scheduler.JobProcessorException(false);
-
-
+     * // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle // all types of failure here, the
+     * scheduler is not going to know how to handle our errors, // ALSO we have to release the lock obtained above (IMPORTANT), lest
+     * the whole system come // to a grinding halt. try {
+     * 
+     * ProcessInstanceDAO instance; if (process.isInMemory()) instance =
+     * _contexts.inMemDao.getConnection().getInstance(we.getIID()); else instance =
+     * _contexts.dao.getConnection().getInstance(we.getIID());
+     * 
+     * if (instance == null) { __log.error(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID())); // nothing we can do, this
+     * instance is not in the database, it will // always // fail. return; } ProcessDAO processDao = instance.getProcess(); process =
+     * _activeProcesses.get(processDao.getProcessId());
+     * 
+     * process.handleWorkEvent(we.getDetail()); debuggingDelay(); } catch (BpelEngineException bee) {
+     * __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee); throw new Scheduler.JobProcessorException(bee,
+     * checkRetry(jobInfo, bee)); } catch (ContextException ce) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce);
+     * throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce)); } catch (RuntimeException rte) {
+     * __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte); throw new Scheduler.JobProcessorException(rte,
+     * checkRetry(jobInfo, rte)); } catch (Throwable t) { __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t); throw new
+     * Scheduler.JobProcessorException(false);
+     * 
+     * 
      */
     /**
      * @see org.apache.ode.bpel.engine.BpelProcess#handleWorkEvent(java.util.Map<java.lang.String,java.lang.Object>)
@@ -364,9 +363,8 @@
                 if (__log.isDebugEnabled()) {
                     __log.debug("InvokeInternal event for mexid " + we.getMexId());
                 }
-                ReliableMyRoleMessageExchangeImpl mex = (ReliableMyRoleMessageExchangeImpl) _engine.getMessageExchange(we
-                        .getMexId());
-                invokeProcess(mex);
+                MessageExchangeDAO mexdao = loadMexDao(we.getMexId());
+                invokeProcess(mexdao);
             } else {
                 // Instance level events
                 // We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks,
@@ -390,7 +388,7 @@
                     __log.debug("Instance " + we.getIID() + " is busy, rescheduling job.");
                     // TODO: This should really be more of something like the exponential backoff algorithm in ethernet.
                     _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis()
-                            + Math.min(randomExp(1000), 10000)));
+                            + _random.nextInt(1000)));
                     return;
                 }
 
@@ -435,6 +433,11 @@
         }
     }
 
+    private MessageExchangeDAO loadMexDao(String mexId) {
+        return isInMemory() ? _inMemDao.getConnection().getMessageExchange(mexId) : _contexts.dao.getConnection()
+                .getMessageExchange(mexId);
+    }
+
     private void setRoles(OProcess oprocess) {
         _partnerRoles = new HashMap<OPartnerLink, PartnerLinkPartnerRoleImpl>();
         _myRoles = new HashMap<OPartnerLink, PartnerLinkMyRoleImpl>();
@@ -485,8 +488,7 @@
     }
 
     ProcessDAO getProcessDAO() {
-        return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid) : getEngine()._contexts.dao
-                .getConnection().getProcess(_pid);
+        return isInMemory() ? _inMemDao.getConnection().getProcess(_pid) : _contexts.dao.getConnection().getProcess(_pid);
     }
 
     static String genCorrelatorId(OPartnerLink plink, String opName) {
@@ -539,7 +541,7 @@
     void deactivate() {
         // Deactivate all the my-role endpoints.
         for (Endpoint endpoint : _myEprs.keySet())
-            _engine._contexts.bindingContext.deactivateMyRoleEndpoint(endpoint);
+            _contexts.bindingContext.deactivateMyRoleEndpoint(endpoint);
 
         // TODO Deactivate all the partner-role channels
     }
@@ -653,10 +655,10 @@
 
     MyRoleMessageExchangeImpl createMyRoleMex(MessageExchangeDAO mexdao) {
         InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
-        
+
         _hydrationLatch.latch(1);
         try {
-            MyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId());
+            MyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId());
             OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
             PortType ptype = plink.myRolePortType;
             Operation op = plink.getMyRoleOperation(mexdao.getOperation());
@@ -667,7 +669,7 @@
         }
     }
 
-    PartnerRoleMessageExchangeImpl createPartnerRoleMex(MessageExchangeDAO  mexdao) {
+    PartnerRoleMessageExchangeImpl createPartnerRoleMex(MessageExchangeDAO mexdao) {
         InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
         PartnerRoleMessageExchangeImpl mex;
         _hydrationLatch.latch(1);
@@ -677,24 +679,27 @@
             Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
             switch (istyle) {
             case BLOCKING:
-                mex = new BlockingPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */
-                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+                mex = new BlockingPartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(),
+                        null, /* EPR todo */
+                        plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
             case ASYNC:
-                mex = new AsyncPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(),
+                mex = new AsyncPartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(),
                         null, /* EPR todo */
                         plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
 
             case TRANSACTED:
-                mex = new TransactedPartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */
-                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+                mex = new TransactedPartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(),
+                        null, /* EPR todo */
+                        plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
             case RELIABLE:
-                mex = new ReliablePartnerRoleMessageExchangeImpl(_engine, mexdao.getMessageExchangeId(), ptype, op, isInMemory(), null, /* EPR todo */
-                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
+                mex = new ReliablePartnerRoleMessageExchangeImpl(_server, mexdao.getMessageExchangeId(), ptype, op, isInMemory(),
+                        null, /* EPR todo */
+                        plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
-                
+
             default:
                 throw new BpelEngineException("Unexpected InvocationStyle: " + istyle);
 
@@ -706,6 +711,10 @@
 
     }
 
+    Set<InvocationStyle> getSupportedInvocationStyle(QName serviceId) {
+        return _invocationStyles;
+    }
+
     private Map<Endpoint, PartnerLinkMyRoleImpl> getEndpointToMyRoleMap() {
         _hydrationLatch.latch(1);
         try {
@@ -724,10 +733,6 @@
         }
     }
 
-    BpelEngineImpl getEngine() {
-        return _engine;
-    }
-
     public boolean isInMemory() {
         return _pconf.isTransient();
     }
@@ -853,7 +858,7 @@
 
             if (!_hydratedOnce) {
                 for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) {
-                    PartnerRoleChannel channel = _engine._contexts.bindingContext.createPartnerRoleChannel(_pid,
+                    PartnerRoleChannel channel = _contexts.bindingContext.createPartnerRoleChannel(_pid,
                             prole._plinkDef.partnerRolePortType, prole._initialPartner);
                     prole._channel = channel;
                     _partnerChannels.put(prole._initialPartner, prole._channel);
@@ -880,16 +885,16 @@
             }
 
             if (isInMemory()) {
-                bounceProcessDAO(_engine._contexts.inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
-            } else if (_engine._contexts.scheduler.isTransacted()) {
+                bounceProcessDAO(_inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
+            } else if (_contexts.scheduler.isTransacted()) {
                 // If we have a transaction, we do this in the current transaction.
-                bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
+                bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
             } else {
                 // If we do not have a transaction we need to create one.
                 try {
-                    _engine._contexts.scheduler.execIsolatedTransaction(new Callable<Object>() {
+                    _contexts.scheduler.execIsolatedTransaction(new Callable<Object>() {
                         public Object call() throws Exception {
-                            bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
+                            bounceProcessDAO(_contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
                             return null;
                         }
                     });
@@ -902,4 +907,13 @@
         }
 
     }
+
+    MessageExchangeDAO createMessageExchange(final char dir) {
+        if (isInMemory()) {
+            return _inMemDao.getConnection().createMessageExchange(dir);
+        } else {
+            return _contexts.dao.getConnection().createMessageExchange(dir);
+        }
+    }
+
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue Jul  3 14:00:06 2007
@@ -25,6 +25,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
+import java.util.Random;
 import java.util.Set;
 import java.util.WeakHashMap;
 import java.util.concurrent.Callable;
@@ -93,24 +94,28 @@
     /** Maximum age of a process before it is quiesced */
     private static Long __processMaxAge;
 
+    /** RNG, for delays */
+    private Random _random = new Random(System.currentTimeMillis());
+
+    private static double _delayMean = 0;
+
     /**
      * Set of processes that are registered with the server. Includes hydrated and dehydrated processes. Guarded by
      * _mngmtLock.writeLock().
      */
     private final HashMap<QName, BpelProcess> _registeredProcesses = new HashMap<QName, BpelProcess>();
 
-    /** Mapping from myrole endpoint name to active process. */
-    private final HashMap<Endpoint, BpelProcess> _serviceMap = new HashMap<Endpoint, BpelProcess>();
+    /** Mapping from myrole service name to active process. */
+    private final HashMap<QName, BpelProcess> _serviceMap = new HashMap<QName, BpelProcess>();
 
     private State _state = State.SHUTDOWN;
 
-    private Contexts _contexts = new Contexts();
+    Contexts _contexts = new Contexts();
 
     private DehydrationPolicy _dehydrationPolicy;
 
     private Properties _configProperties;
 
-
     BpelDatabase _db;
 
     /**
@@ -273,9 +278,9 @@
 
             for (Endpoint e : process.getServiceNames()) {
                 __log.debug("Register process: serviceId=" + e + ", process=" + process);
-                _serviceMap.put(e, process);
+                _serviceMap.put(e.serviceName, process);
             }
-            
+
             process.activate(_contexts);
 
             _registeredProcesses.put(process.getPID(), process);
@@ -350,15 +355,12 @@
         // TODO: use the message to route to the correct service if more than
         // one service is listening on the same endpoint.
 
-        BpelProcess routed = null;
-        for (Endpoint endpoint : _serviceMap.keySet()) {
-            if (endpoint.serviceName.equals(service))
-                routed = _serviceMap.get(endpoint);
-        }
-        if (__log.isDebugEnabled())
-            __log.debug("Routed: svcQname " + service + " --> " + routed);
-        return routed;
-
+        _mngmtLock.readLock().lock();
+        try {
+            return _serviceMap.get(service);
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
     }
 
     /**
@@ -446,10 +448,6 @@
         _contexts.dao = daoCF;
     }
 
-    public void setInMemDaoConnectionFactory(BpelDAOConnectionFactory daoCF) {
-        _contexts.inMemDao = daoCF;
-    }
-
     public void setBindingContext(BindingContext bc) {
         _contexts.bindingContext = bc;
     }
@@ -467,13 +465,7 @@
             Callable<String> createDao = new Callable<String>() {
 
                 public String call() throws Exception {
-                    MessageExchangeDAO dao;
-                    if (target.isInMemory()) {
-                        dao = _contexts.inMemDao.getConnection().createMessageExchange(
-                                MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
-                    } else {
-                        dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
-                    }
+                    MessageExchangeDAO dao = target.createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
                     dao.setInvocationStyle(istyle.toString());
                     dao.setCorrelationId(clientKey);
                     dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
@@ -554,7 +546,7 @@
                         return null;
 
                     ProcessDAO pdao = mexdao.getProcess();
-                    BpelProcess process = pdao == null ? null : _engine._activeProcesses.get(pdao.getProcessId());
+                    BpelProcess process = pdao == null ? null : _registeredProcesses.get(pdao.getProcessId());
 
                     if (process == null) {
                         String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
@@ -607,23 +599,73 @@
 
         _mngmtLock.readLock().lock();
         try {
-
+            BpelProcess process = _serviceMap.get(serviceId);
+            if (process == null)
+                throw new BpelEngineException("No such service: " + serviceId);
+            return process.getSupportedInvocationStyle(serviceId);
         } finally {
             _mngmtLock.readLock().unlock();
         }
     }
 
-    
     void registerMessageExchangeStateListener(MessageExchangeStateListener mexStateListener) {
         WeakReference<MessageExchangeStateListener> ref = new WeakReference<MessageExchangeStateListener>(mexStateListener);
 
     }
 
+    OProcess getOProcess(QName processId) {
+        _mngmtLock.readLock().lock();
+        try {
+            BpelProcess process = _registeredProcesses.get(processId);
+
+            if (process == null)
+                return null;
+
+            return process.getOProcess();
+
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+    }
+
     protected void assertTransaction() {
         if (!_contexts.scheduler.isTransacted())
             throw new BpelEngineException("Operation must be performed in a transaction!");
     }
 
+    void fireEvent(BpelEvent event) {
+        // Note that the eventListeners list is a copy-on-write array, so need
+        // to mess with synchronization.
+        for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) {
+            l.onEvent(event);
+        }
+    }
+
+    /**
+     * Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially
+     * distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable.
+     */
+    private void debuggingDelay() {
+        // Do a delay for debugging purposes.
+        if (_delayMean != 0)
+            try {
+                long delay = randomExp(_delayMean);
+                // distribution
+                // with mean
+                // _delayMean
+                __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms.");
+                Thread.sleep(delay);
+            } catch (InterruptedException e) {
+                ; // ignore
+            }
+    }
+
+    private long randomExp(double mean) {
+        double u = _random.nextDouble(); // Uniform
+        long delay = (long) (-Math.log(u) * mean); // Exponential
+        return delay;
+    }
+
     private class ProcessDefReaper implements Runnable {
         public void run() {
             __log.debug("Starting process definition reaper thread.");
@@ -661,6 +703,4 @@
         }
     }
 
-    
-    
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java Tue Jul  3 14:00:06 2007
@@ -45,7 +45,6 @@
     BindingContext bindingContext;
 
     BpelDAOConnectionFactory dao;
-    BpelDAOConnectionFactory inMemDao;
 
     /** Global Message-Exchange interceptors. Must be copy-on-write!!! */ 
     final List<MessageExchangeInterceptor >globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>();

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Tue Jul  3 14:00:06 2007
@@ -93,7 +93,7 @@
 
     Contexts _contexts;
 
-    BpelEngineImpl _engine;
+    BpelServerImpl _server;
 
     boolean _associated;
 
@@ -121,9 +121,9 @@
 
     private Set<String> _propNames;
 
-    public MessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+    public MessageExchangeImpl(BpelServerImpl engine, String mexId) {
         _contexts = engine._contexts;
-        _engine = engine;
+        _server = engine;
         _mexId = mexId;
     }
 

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Tue Jul  3 14:00:06 2007
@@ -28,7 +28,7 @@
 
     protected QName _callee;
 
-    public MyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+    public MyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) {
         super(engine, mexId);
     }
 
@@ -116,8 +116,8 @@
         doInTX(new InDbAction<Void>() {
 
             public Void call(MessageExchangeDAO mexdao) {
-                _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
-                _engine._contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
+                _server._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+                _server._contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
                 return null;
             }
 
@@ -134,9 +134,9 @@
      * @return <code>true</code> if execution should continue, <code>false</code> otherwise
      */
     protected boolean processInterceptors(InterceptorInvoker invoker, MessageExchangeDAO mexDao) {
-        InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), mexDao.getProcess(), null);
+        InterceptorContextImpl ictx = new InterceptorContextImpl(_server._contexts.dao.getConnection(), mexDao.getProcess(), null);
 
-        for (MessageExchangeInterceptor i : _engine.getGlobalInterceptors())
+        for (MessageExchangeInterceptor i : _server.getGlobalInterceptors())
             if (!processInterceptor(i, this, ictx, invoker))
                 return false;
 

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java Tue Jul  3 14:00:06 2007
@@ -57,7 +57,7 @@
     public static final int TIMEOUT = 2 * 60 * 1000;
 
     
-    public ReliableMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+    public ReliableMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) {
         super(engine, mexId);
 
         // RELIABLE means we are bound to a transaction
@@ -76,7 +76,7 @@
         if (_status != Status.NEW)
             throw new BpelEngineException("Invalid state: " + _status);
         
-        final BpelProcess target = _engine.route(_callee, _request);
+        final BpelProcess target = _server.route(_callee, _request);
         if (target == null) {
             if (__log.isWarnEnabled())
                 __log.warn(__msgs.msgUnknownEPR("" + _epr));

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java Tue Jul  3 14:00:06 2007
@@ -9,7 +9,7 @@
  */
 public class TransactedMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
 
-    public TransactedMyRoleMessageExchangeImpl(BpelEngineImpl engine, String mexId) {
+    public TransactedMyRoleMessageExchangeImpl(BpelServerImpl engine, String mexId) {
         super(engine, mexId);
     }
 

Modified: incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=552978&r1=552977&r2=552978
==============================================================================
--- incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java (original)
+++ incubator/ode/branches/bart/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java Tue Jul  3 14:00:06 2007
@@ -1,4 +1,4 @@
-/*
+at/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information