You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/09/06 07:46:57 UTC

svn commit: r573153 [4/9] - in /ode/trunk: ./ axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/ bpel-api/src/main/java/org/apache/ode/bpel/explang/ bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-api/src/main/java/org/apache/ode/bpel/pmapi/...

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Sep  5 22:46:42 2007
@@ -18,6 +18,16 @@
  */
 package org.apache.ode.bpel.engine;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.common.CorrelationKey;
@@ -27,7 +37,6 @@
 import org.apache.ode.bpel.dao.CorrelatorDAO;
 import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.dao.MessageRouteDAO;
 import org.apache.ode.bpel.dao.PartnerLinkDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
@@ -36,17 +45,20 @@
 import org.apache.ode.bpel.evt.*;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.ContextException;
-import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReference;
-import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
 import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.memdao.ProcessInstanceDaoImpl;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.o.OElementVarType;
+import org.apache.ode.bpel.o.OMessageVarType;
+import org.apache.ode.bpel.o.OPartnerLink;
+import org.apache.ode.bpel.o.OProcess;
+import org.apache.ode.bpel.o.OScope;
 import org.apache.ode.bpel.o.OMessageVarType.Part;
-import org.apache.ode.bpel.o.*;
 import org.apache.ode.bpel.runtime.BpelJacobRunnable;
 import org.apache.ode.bpel.runtime.BpelRuntimeContext;
 import org.apache.ode.bpel.runtime.CorrelationSetInstance;
@@ -71,15 +83,11 @@
 import org.w3c.dom.Node;
 import org.w3c.dom.Document;
 
-import javax.wsdl.Operation;
-import javax.xml.namespace.QName;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-
+/**
+ * 
+ * 
+ * @author Maciej Szefler
+ */
 class BpelRuntimeContextImpl implements BpelRuntimeContext {
 
     private static final Log __log = LogFactory.getLog(BpelRuntimeContextImpl.class);
@@ -96,58 +104,79 @@
     /** JACOB ExecutionQueue (state) */
     protected ExecutionQueueImpl _soup;
 
-    private MyRoleMessageExchangeImpl _instantiatingMessageExchange;
+    private MessageExchangeDAO _instantiatingMessageExchange;
 
-    private OutstandingRequestManager _outstandingRequests;
+    private BpelInstanceWorker _instanceWorker;
 
     private BpelProcess _bpelProcess;
 
     /** Five second maximum for continous execution. */
     private long _maxReductionTimeMs = 2000000;
 
-    public BpelRuntimeContextImpl(BpelProcess bpelProcess, ProcessInstanceDAO dao, PROCESS PROCESS,
-                                  MyRoleMessageExchangeImpl instantiatingMessageExchange) {
-        _bpelProcess = bpelProcess;
-        _dao = dao;
-        _iid = dao.getInstanceId();
-        _instantiatingMessageExchange = instantiatingMessageExchange;
-        _vpu = new JacobVPU();
-        _vpu.registerExtension(BpelRuntimeContext.class, this);
+    private Contexts _contexts;
 
-        _soup = new ExecutionQueueImpl(null);
-        _soup.setReplacementMap(_bpelProcess.getReplacementMap());
-        _outstandingRequests = new OutstandingRequestManager();
-        _vpu.setContext(_soup);
+    private boolean _executed;
 
-        if (bpelProcess.isInMemory()) {
-            ProcessInstanceDaoImpl inmem = (ProcessInstanceDaoImpl) _dao;
-            if (inmem.getSoup() != null) {
-                _soup = (ExecutionQueueImpl) inmem.getSoup();
-                _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
-                _vpu.setContext(_soup);
-            }
+    BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao) {
+        this(instanceWorker, dao, new ExecutionQueueImpl(null));
+
+        // The following allows us to skip deserialization of the soup if our execution state in memory is the same
+        // as that in the database.
+        Object cachedState = instanceWorker.getCachedState(dao.getExecutionStateCounter());
+        if (cachedState != null) {
+            if (__log.isDebugEnabled())
+                __log.debug("CACHE HIT: Using cached state #" + dao.getExecutionStateCounter() + " to resume instance " + dao.getInstanceId());
+            _soup = (ExecutionQueueImpl) cachedState; 
+            _soup.setReplacementMap(_bpelProcess.getReplacementMap());
+            _vpu.setContext(_soup);
         } else {
+            if (__log.isDebugEnabled())
+                __log.debug("CACHE MISS: Loading state to resume instance " + dao.getInstanceId() + " from database ");
             byte[] daoState = dao.getExecutionState();
-            if (daoState != null) {
-                ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
-                try {
-                    _soup.read(iis);
-                } catch (Exception ex) {
-                    throw new RuntimeException(ex);
-                }
-                _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
+            ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
+            try {
+                _soup.read(iis);
+            } catch (Exception ex) {
+                throw new RuntimeException(ex);
             }
         }
+    }
 
-        if (PROCESS != null) {
-            _vpu.inject(PROCESS);
-        }
+    BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
+            MessageExchangeDAO instantiatingMessageExchange) {
 
+        this(instanceWorker, dao, new ExecutionQueueImpl(null));
+
+        if (PROCESS == null)
+            throw new NullPointerException();
+        if (instantiatingMessageExchange == null)
+            throw new NullPointerException();
+        _soup.setGlobalData(new OutstandingRequestManager());
+        _instantiatingMessageExchange = instantiatingMessageExchange;
+        _vpu.inject(PROCESS);
+
+    }
+
+    BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, ExecutionQueueImpl soup) {
+        _instanceWorker = instanceWorker;
+        _bpelProcess = instanceWorker._process;
+        _contexts = instanceWorker._contexts;
+        _dao = dao;
+        _iid = dao.getInstanceId();
+        _vpu = new JacobVPU();
+        _vpu.registerExtension(BpelRuntimeContext.class, this);
+        _soup = soup;
+        _soup.setReplacementMap(_bpelProcess.getReplacementMap());
+        _vpu.setContext(_soup);
         if (BpelProcess.__log.isDebugEnabled()) {
             __log.debug("BpelRuntimeContextImpl created for instance " + _iid + ". INDEXED STATE=" + _soup.getIndex());
         }
     }
 
+    public String toString() {
+        return "{BpelRuntimeCtx PID=" + _bpelProcess.getPID() + ", IID=" + _iid + "}";
+    }
+
     public Long getPid() {
         return _iid;
     }
@@ -189,8 +218,8 @@
             BpelProcess.__log.debug("ProcessImpl completed with fault '" + faultData.getFaultName() + "'");
         }
 
-        _dao.setFault(faultData.getFaultName(), faultData.getExplanation(), faultData.getFaultLineNo(), faultData
-                .getActivityId(), faultData.getFaultMessage());
+        _dao.setFault(faultData.getFaultName(), faultData.getExplanation(), faultData.getFaultLineNo(), faultData.getActivityId(),
+                faultData.getFaultMessage());
 
         // send event
         ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
@@ -202,7 +231,7 @@
         sendEvent(new ProcessCompletionEvent(faultData.getFaultName()));
         _dao.finishCompletion();
 
-        faultOutstandingMessageExchanges(faultData);
+        cleanupOutstandingMyRoleExchanges(faultData);
     }
 
     /**
@@ -223,17 +252,16 @@
         sendEvent(new ProcessCompletionEvent(null));
         _dao.finishCompletion();
 
-        completeOutstandingMessageExchanges();
+        cleanupOutstandingMyRoleExchanges();
     }
 
     /**
-     * @see BpelRuntimeContext#createScopeInstance(Long,
-     *      org.apache.ode.bpel.o.OScope)
+     * @see BpelRuntimeContext#createScopeInstance(Long, org.apache.ode.bpel.o.OScope)
      */
     public Long createScopeInstance(Long parentScopeId, OScope scope) {
         if (BpelProcess.__log.isTraceEnabled()) {
-            BpelProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("createScopeInstance", new Object[] {
-                    "parentScopeId", parentScopeId, "scope", scope }));
+            BpelProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("createScopeInstance", new Object[] { "parentScopeId",
+                    parentScopeId, "scope", scope }));
         }
 
         ScopeDAO parent = null;
@@ -255,8 +283,8 @@
 
         ScopeDAO parent = _dao.getScope(parentScopeId);
         for (OPartnerLink partnerLink : partnerLinks) {
-            PartnerLinkDAO pdao = parent.createPartnerLink(partnerLink.getId(), partnerLink.name,
-                    partnerLink.myRoleName, partnerLink.partnerRoleName);
+            PartnerLinkDAO pdao = parent.createPartnerLink(partnerLink.getId(), partnerLink.name, partnerLink.myRoleName,
+                    partnerLink.partnerRoleName);
             // If there is a myrole on the link, initialize the session id so it
             // is always
             // available for opaque correlations. The myrole session id should
@@ -266,12 +294,11 @@
         }
     }
 
-    public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance,
-                       Selector[] selectors) throws FaultException {
+    public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, Selector[] selectors)
+            throws FaultException {
         if (BpelProcess.__log.isTraceEnabled())
             BpelProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("select", new Object[] { "pickResponseChannel",
-                    pickResponseChannel, "timeout", timeout, "createInstance", createInstance,
-                    "selectors", selectors }));
+                    pickResponseChannel, "timeout", timeout, "createInstance", createInstance, "selectors", selectors }));
 
         ProcessDAO processDao = _dao.getProcess();
 
@@ -297,12 +324,11 @@
             correlators.add(processDao.getCorrelator(correlatorId));
         }
 
-        int conflict = _outstandingRequests.findConflict(selectors);
+        int conflict = getORM().findConflict(selectors);
         if (conflict != -1)
-            throw new FaultException(_bpelProcess.getOProcess().constants.qnConflictingReceive, selectors[conflict]
-                    .toString());
+            throw new FaultException(_bpelProcess.getOProcess().constants.qnConflictingReceive, selectors[conflict].toString());
 
-        _outstandingRequests.register(pickResponseChannelStr, selectors);
+        getORM().register(pickResponseChannelStr, selectors);
 
         // TODO - ODE-58
 
@@ -314,48 +340,16 @@
             for (int i = 0; i < correlators.size(); ++i) {
                 CorrelatorDAO ci = correlators.get(i);
                 if (ci.equals(_dao.getInstantiatingCorrelator())) {
-                    inputMsgMatch(pickResponseChannelStr, i, _instantiatingMessageExchange);
+                    injectMyRoleMessageExchange(pickResponseChannelStr, i, _instantiatingMessageExchange);
                     if (BpelProcess.__log.isDebugEnabled()) {
-                        BpelProcess.__log.debug("SELECT: " + pickResponseChannel
-                                + ": FOUND match for NEW instance mexRef=" + _instantiatingMessageExchange);
+                        BpelProcess.__log.debug("SELECT: " + pickResponseChannel + ": FOUND match for NEW instance mexRef="
+                                + _instantiatingMessageExchange);
                     }
                     return;
                 }
             }
         }
 
-        // if (BpelProcess.__log.isDebugEnabled()) {
-        // BpelProcess.__log.debug("SELECT: " + pickResponseChannel
-        // + ": NEW instance match NOT FOUND; CHECKING MESSAGES. ");
-        // }
-        //
-        //
-        // for (int i = 0; i < selectors.length; ++i) {
-        // CorrelatorDAO correlator = correlators.get(i);
-        // Selector selector = selectors[i];
-        // MessageExchangeDAO mexdao = correlator
-        // .dequeueMessage(selector.correlationKey);
-        // if (mexdao != null) {
-        // // Found message matching one of our selectors.
-        // if (BpelProcess.__log.isDebugEnabled()) {
-        // BpelProcess.__log.debug("SELECT: " + pickResponseChannel
-        // + ": FOUND match to MESSAGE " + mexdao + " on CKEY "
-        // + selector.correlationKey);
-        // }
-        //
-        // MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(
-        // _bpelProcess._engine, mexdao);
-        //
-        // inputMsgMatch(pickResponseChannel.export(), i, mex);
-        // return;
-        // }
-        // }
-        //
-        // if (BpelProcess.__log.isDebugEnabled()) {
-        // BpelProcess.__log.debug("SELECT: " + pickResponseChannel
-        // + ": MESSAGE match NOT FOUND.");
-        // }
-
         if (timeout != null) {
             registerTimer(pickResponseChannel, timeout);
             if (BpelProcess.__log.isDebugEnabled()) {
@@ -376,7 +370,6 @@
             }
         }
 
-
     }
 
     /**
@@ -404,15 +397,14 @@
         XmlDataDAO dataDAO = scopeDAO.getVariable(variable.declaration.name);
 
         if (dataDAO.isNull()) {
-            throw new FaultException(_bpelProcess.getOProcess().constants.qnUninitializedVariable,
-                    "The variable " + variable.declaration.name + " isn't properly initialized.");
+            throw new FaultException(_bpelProcess.getOProcess().constants.qnUninitializedVariable, "The variable "
+                    + variable.declaration.name + " isn't properly initialized.");
         }
 
         return dataDAO.get();
     }
 
-    public Node fetchVariableData(VariableInstance var, OMessageVarType.Part part, boolean forWriting)
-            throws FaultException {
+    public Node fetchVariableData(VariableInstance var, OMessageVarType.Part part, boolean forWriting) throws FaultException {
         Node container = fetchVariableData(var, forWriting);
 
         // If we want a specific part, we will need to navigate through the
@@ -450,9 +442,9 @@
     }
 
     /**
-     * Evaluate a property alias query expression against a variable, returning
-     * the normalized {@link String} representation of the property value.
-     *
+     * Evaluate a property alias query expression against a variable, returning the normalized {@link String} representation of the
+     * property value.
+     * 
      * @param variable
      *            variable to read
      * @param property
@@ -468,8 +460,7 @@
         String val = _bpelProcess.extractProperty((Element) varData, alias, variable.declaration.getDescription());
 
         if (BpelProcess.__log.isTraceEnabled()) {
-            BpelProcess.__log.trace("readPropertyAlias(variable=" + variable + ", alias=" + alias + ") = "
-                    + val.toString());
+            BpelProcess.__log.trace("readPropertyAlias(variable=" + variable + ", alias=" + alias + ") = " + val.toString());
         }
 
         return val;
@@ -507,7 +498,7 @@
             // We have an element
             nodeQName = new QName(targetNode.getNamespaceURI(), targetNode.getLocalName());
         }
-        return _bpelProcess._engine._contexts.eprContext.convertEndpoint(nodeQName, sourceNode).toXML();
+        return _contexts.eprContext.convertEndpoint(nodeQName, sourceNode).toXML();
     }
 
     public void commitChanges(VariableInstance variable, Node changes) {
@@ -518,9 +509,9 @@
         writeProperties(variable, changes, dataDAO);
     }
 
-    public void reply(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg,
-                      QName fault) throws FaultException {
-        String mexRef = _outstandingRequests.release(plinkInstnace, opName, mexId);
+    public void reply(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg, QName fault)
+            throws FaultException {
+        String mexRef = getORM().release(plinkInstnace, opName, mexId);
 
         if (mexRef == null) {
             throw new FaultException(_bpelProcess.getOProcess().constants.qnMissingRequest);
@@ -532,61 +523,45 @@
         evt.setOperation(opName);
         evt.setPortType(plinkInstnace.partnerLink.myRolePortType.getQName());
 
-        MessageExchangeDAO mex = _dao.getConnection().getMessageExchange(mexRef);
+        // Get the "my-role" mex from the DB.
+        MessageExchangeDAO myrolemex = _dao.getConnection().getMessageExchange(mexRef);
 
-        MessageDAO message = mex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput()
-                .getMessage().getQName());
+        Operation operation = plinkInstnace.partnerLink.getMyRoleOperation(opName);
+        if (operation == null || operation.getOutput() == null) {
+            // reply to operation that is either not defined or one-way
+            // Perhaps this should be detected at compile time? 
+            throw new FaultException(_bpelProcess.getOProcess().constants.qnMissingRequest,
+                    "Undefined two-way operation \"" + opName + "\".");
+            
+        }
+        
+        // TODO what if msg==null? i.e. for a reply-with-fault.
+        
+        MessageDAO message = myrolemex.createMessage(
+                operation.getOutput().getMessage()
+                .getQName());
         message.setData(msg);
 
-        MyRoleMessageExchangeImpl m = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mex);
-        _bpelProcess.initMyRoleMex(m);
-        m.setResponse(new MessageImpl(message));
+        myrolemex.setResponse(message);
 
+        AckType ackType;
         if (fault != null) {
-            mex.setStatus(MessageExchange.Status.FAULT.toString());
-            mex.setFault(fault);
+            ackType = AckType.FAULT;
+            myrolemex.setFault(fault);
             evt.setAspect(ProcessMessageExchangeEvent.PROCESS_FAULT);
         } else {
-            mex.setStatus(MessageExchange.Status.RESPONSE.toString());
+            ackType = AckType.RESPONSE;
             evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
         }
 
-        if (mex.getPipedMessageExchangeId() != null) {
-            PartnerRoleMessageExchange pmex = (PartnerRoleMessageExchange) _bpelProcess
-                    .getEngine().getMessageExchange(mex.getPipedMessageExchangeId());
-            if (BpelProcess.__log.isDebugEnabled()) {
-                __log.debug("Replying to a p2p mex, myrole " + m + " - partnerole " + pmex);
-            }
-            try {
-                switch (m.getStatus()) {
-                    case FAILURE:
-                        // We can't seem to get the failure out of the myrole mex?
-                        pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", null);
-                        break;
-                    case FAULT:
-                        Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
-                                .getMessage().getQName());
-                        faultRes.setMessage(m.getResponse().getMessage());
-                        pmex.replyWithFault(m.getFault(), faultRes);
-                        break;
-                    case RESPONSE:
-                        Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
-                        response.setMessage(m.getResponse().getMessage());
-                        pmex.reply(response);
-                        break;
-                    default:
-                        __log.warn("Unexpected state: " + m.getStatus());
-                        break;
-                }
-            } finally {
-                mex.release();
-            }
-        } else _bpelProcess._engine._contexts.mexContext.onAsyncReply(m);
-
-        // send event
+        Status previousStatus = myrolemex.getStatus();
+        myrolemex.setStatus(Status.ACK);
+        myrolemex.setAckType(ackType);
+        _bpelProcess.onMyRoleMexAck(myrolemex, previousStatus);
         sendEvent(evt);
     }
 
+
     /**
      * @see BpelRuntimeContext#writeCorrelation(org.apache.ode.bpel.runtime.CorrelationSetInstance,
      *      org.apache.ode.bpel.common.CorrelationKey)
@@ -594,8 +569,7 @@
     public void writeCorrelation(CorrelationSetInstance cset, CorrelationKey correlation) {
         ScopeDAO scopeDAO = _dao.getScope(cset.scopeInstance);
         CorrelationSetDAO cs = scopeDAO.getCorrelationSet(cset.declaration.name);
-        OScope.CorrelationSet csetdef = (OScope.CorrelationSet) _bpelProcess.getOProcess()
-                .getChild(correlation.getCSetId());
+        OScope.CorrelationSet csetdef = (OScope.CorrelationSet) _bpelProcess.getOProcess().getChild(correlation.getCSetId());
         QName[] propNames = new QName[csetdef.properties.size()];
         for (int m = 0; m < csetdef.properties.size(); m++) {
             OProcess.OProperty oProperty = csetdef.properties.get(m);
@@ -610,14 +584,13 @@
     }
 
     /**
-     * Common functionality to initialize a correlation set based on data
-     * available in a variable.
-     *
+     * Common functionality to initialize a correlation set based on data available in a variable.
+     * 
      * @param cset
      *            the correlation set instance
      * @param variable
      *            variable instance
-     *
+     * 
      * @throws IllegalStateException
      *             DOCUMENTME
      */
@@ -663,64 +636,59 @@
         sendEvent(evt);
         sendEvent(new ProcessTerminationEvent());
 
-        failOutstandingMessageExchanges();
+        cleanupOutstandingMyRoleExchanges();
     }
 
     public void registerTimer(TimerResponseChannel timerChannel, Date timeToFire) {
         WorkEvent we = new WorkEvent();
         we.setIID(_dao.getInstanceId());
+        we.setProcessId(_bpelProcess.getPID());
         we.setChannel(timerChannel.export());
         we.setType(WorkEvent.Type.TIMER);
-        we.setInMem(_bpelProcess.isInMemory());
-        _bpelProcess._engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
+        _bpelProcess.scheduleWorkEvent(we, timeToFire);
     }
 
     private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey key) {
+
         WorkEvent we = new WorkEvent();
         we.setIID(_dao.getInstanceId());
+        we.setProcessId(_bpelProcess.getPID());
         we.setType(WorkEvent.Type.MATCHER);
         we.setCorrelatorId(correlatorId);
         we.setCorrelationKey(key);
-        we.setInMem(_bpelProcess.isInMemory());
-        _bpelProcess._engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
+        _bpelProcess.scheduleWorkEvent(we, null);
     }
 
     public String invoke(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
-                         InvokeResponseChannel channel) throws FaultException {
+            InvokeResponseChannel channel) throws FaultException {
+
+        // TODO: move a lot of this into BpelProcess
+
+        // TODO: think we should move the dao creation into bpelprocess --mbs
+        MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(),
+                MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
+        mexDao.setStatus(MessageExchange.Status.NEW);
+        mexDao.setOperation(operation.getName());
+        mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
+        mexDao.setPartnerLinkModelId(partnerLink.partnerLink.getId());
 
+        PartnerRoleChannel partnerRoleChannel = _bpelProcess.getPartnerRoleChannel(partnerLink.partnerLink);
         PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink);
-        // The target (partner endpoint) -- if it has not been explicitly
-        // initialized
-        // then use the value from bthe deployment descriptor ..
+
         Element partnerEPR = plinkDAO.getPartnerEPR();
+
         EndpointReference partnerEpr;
 
         if (partnerEPR == null) {
-            partnerEpr = _bpelProcess.getInitialPartnerRoleEPR(partnerLink.partnerLink);
+            partnerEpr = partnerRoleChannel.getInitialEndpointReference();
             // In this case, the partner link has not been initialized.
             if (partnerEpr == null)
                 throw new FaultException(partnerLink.partnerLink.getOwner().constants.qnUninitializedPartnerRole);
         } else {
-            partnerEpr = _bpelProcess._engine._contexts.eprContext.resolveEndpointReference(partnerEPR);
-        }
-
-        if (BpelProcess.__log.isDebugEnabled()) {
-            BpelProcess.__log.debug("INVOKING PARTNER: partnerLink=" + partnerLink +
-                    ", op=" + operation.getName() + " channel=" + channel + ")");
+            partnerEpr = _contexts.eprContext.resolveEndpointReference(partnerEPR);
         }
-
-        // prepare event
-        ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
-        evt.setOperation(operation.getName());
-        evt.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
-        evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
-
-        MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(
-                MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
-        mexDao.setStatus(MessageExchange.Status.NEW.toString());
-        mexDao.setOperation(operation.getName());
-        mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
-        mexDao.setPartnerLinkModelId(partnerLink.partnerLink.getId());
+        
+        mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
         mexDao.setPartnerLink(plinkDAO);
         mexDao.setProcess(_dao.getProcess());
         mexDao.setInstance(_dao);
@@ -728,145 +696,75 @@
                 : MessageExchangePattern.REQUEST_ONLY).toString());
         mexDao.setChannel(channel == null ? null : channel.export());
 
-        // Properties used by stateful-exchange protocol.
-        String mySessionId = plinkDAO.getMySessionId();
-        String partnerSessionId = plinkDAO.getPartnerSessionId();
-
-        if ( mySessionId != null )
-            mexDao.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId);
-        if ( partnerSessionId != null )
-            mexDao.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, partnerSessionId);
-
-        if (__log.isDebugEnabled())
-            __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
-
         MessageDAO message = mexDao.createMessage(operation.getInput().getMessage().getQName());
         mexDao.setRequest(message);
+        mexDao.setTimeout(30000);
         message.setData(outgoingMessage);
         message.setType(operation.getInput().getMessage().getQName());
 
-        // Get he my-role EPR (if myrole exists) for optional use by partner
-        // (for callback mechanism).
-        EndpointReference myRoleEndpoint = partnerLink.partnerLink.hasMyRole() ? _bpelProcess
-                .getInitialMyRoleEPR(partnerLink.partnerLink) : null;
-        PartnerRoleMessageExchangeImpl mex = new PartnerRoleMessageExchangeImpl(_bpelProcess._engine, mexDao,
-                partnerLink.partnerLink.partnerRolePortType, operation, partnerEpr, myRoleEndpoint, _bpelProcess
-                .getPartnerRoleChannel(partnerLink.partnerLink));
-
-        BpelProcess p2pProcess = null;
-        Endpoint partnerEndpoint = _bpelProcess.getInitialPartnerRoleEndpoint(partnerLink.partnerLink);
-        if (partnerEndpoint != null)
-            p2pProcess = _bpelProcess.getEngine().route(partnerEndpoint.serviceName, mex.getRequest());
-
-        if (p2pProcess != null) {
-            // Creating a my mex using the same message id as partner mex to "pipe" them
-            MyRoleMessageExchange myRoleMex = _bpelProcess.getEngine().createMessageExchange(
-                    mex.getMessageExchangeId(), partnerEndpoint.serviceName,
-                    operation.getName(), mex.getMessageExchangeId());
-
-            if (BpelProcess.__log.isDebugEnabled()) {
-                __log.debug("Invoking in a p2p interaction, partnerrole " + mex + " - myrole " + myRoleMex);
-            }
-
-            Message odeRequest = myRoleMex.createMessage(operation.getInput().getMessage().getQName());
-            odeRequest.setMessage(outgoingMessage);
-
-            if (BpelProcess.__log.isDebugEnabled()) {
-                __log.debug("Setting myRoleMex session ids for p2p interaction, mySession "
-                        + partnerSessionId + " - partnerSess " + mySessionId);
-            }
-            if ( partnerSessionId != null )
-                myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, partnerSessionId);
-            if ( mySessionId != null )
-                myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId);
+        // prepare event
+        ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
+        evt.setOperation(operation.getName());
+        evt.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
+        evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
+        evt.setMexId(mexDao.getMessageExchangeId());
+        sendEvent(evt);
 
-            mex.setStatus(MessageExchange.Status.REQUEST);
-            myRoleMex.invoke(odeRequest);
 
-            // Can't expect any sync response
-            mex.replyAsync();
-        } else {
-            // If we couldn't find the endpoint, then there is no sense
-            // in asking the IL to invoke.
-            if (partnerEpr != null) {
-                mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
-                mex.setStatus(MessageExchange.Status.REQUEST);
-                _bpelProcess._engine._contexts.mexContext.invokePartner(mex);
-            } else {
-                __log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR));
-                mex.setFailure(FailureType.UNKNOWN_ENDPOINT, "UnknownEndpoint", partnerEPR);
-            }
+        if (__log.isDebugEnabled()) {
+            __log.debug("INVOKING PARTNER: partnerLink=" + partnerLink + ", op=" + operation.getName() + " channel="
+                    + channel + ")");
         }
 
-        evt.setMexId(mexDao.getMessageExchangeId());
-        sendEvent(evt);
+        _bpelProcess.invokePartner(mexDao);
+       
 
-        // MEX pattern is request only, at this point the status can only be a one way
-        if (mexDao.getPattern().equals(MessageExchangePattern.REQUEST_ONLY.toString())) {
-            mexDao.setStatus(MessageExchange.Status.ASYNC.toString());
-            // This mex can now be released
-            mexDao.release();
-        }
-        // Check if there is a synchronous response, if so, we need to inject the
-        // message on the response channel.
-        switch (mex.getStatus()) {
-            case NEW:
-                throw new AssertionError("Impossible!");
-            case ASYNC:
-                break;
-            case RESPONSE:
-            case FAULT:
-            case FAILURE:
-                invocationResponse(mex);
-                break;
-            default:
-                __log.error("Partner did not acknowledge message exchange: " + mex);
-                mex.setFailure(FailureType.NO_RESPONSE, "Partner did not acknowledge.", null);
-                invocationResponse(mex);
+        // In case a response/fault was available right away, which will happen for BLOCKING/TRANSACTED invocations,
+        // we need to inject a message on the response channel, so that the process continues.
+        switch (mexDao.getStatus()) {
+        case ACK:
+            injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
+            break;
+        case ASYNC:
+            // we'll have to wait for the response.
+            break;
+        default:
+            throw new AssertionError("Unexpected MEX status: " + mexDao.getStatus());
         }
-
+        
         return mexDao.getMessageExchangeId();
 
     }
 
     void execute() {
+        if (!_contexts.isTransacted())
+            throw new BpelEngineException("MUST RUN IN TRANSACTION!");
+        if (_executed)
+            throw new IllegalStateException("cannot call execute() twice!");
+
         long maxTime = System.currentTimeMillis() + _maxReductionTimeMs;
+
+        // Execute the process state reductions
         boolean canReduce = true;
         while (ProcessState.canExecute(_dao.getState()) && System.currentTimeMillis() < maxTime && canReduce) {
             canReduce = _vpu.execute();
         }
+
         _dao.setLastActiveTime(new Date());
         if (!ProcessState.isFinished(_dao.getState())) {
-            if (__log.isDebugEnabled()) __log.debug("Setting execution state on instance " + _iid);
-            _soup.setGlobalData(_outstandingRequests);
-
-            if (_bpelProcess.isInMemory()) {
-                // don't serialize in-memory processes
-                ((ProcessInstanceDaoImpl) _dao).setSoup(_soup);
-            } else {
-                ByteArrayOutputStream bos = new ByteArrayOutputStream(10000);
-                try {
-                    _soup.write(bos);
-                    bos.close();
-                } catch (Exception ex) {
-                    throw new RuntimeException(ex);
-                }
-                _dao.setExecutionState(bos.toByteArray());
-            }
+            saveState();
 
             if (ProcessState.canExecute(_dao.getState()) && canReduce) {
                 // Max time exceeded (possibly an infinite loop).
                 if (__log.isDebugEnabled())
                     __log.debug("MaxTime exceeded for instance # " + _iid);
+
                 try {
                     WorkEvent we = new WorkEvent();
                     we.setIID(_iid);
+                    we.setProcessId(_bpelProcess.getPID());
                     we.setType(WorkEvent.Type.RESUME);
-                    we.setInMem(_bpelProcess.isInMemory());
-                    if (_bpelProcess.isInMemory())
-                        _bpelProcess._engine._contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
-                    else
-                        _bpelProcess._engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), new Date());
+                    _contexts.scheduler.schedulePersistedJob(we.getDetail(), new Date());
                 } catch (ContextException e) {
                     __log.error("Failed to schedule resume task.", e);
                     throw new BpelEngineException(e);
@@ -875,7 +773,24 @@
         }
     }
 
-    void inputMsgMatch(final String responsechannel, final int idx, MyRoleMessageExchangeImpl mex) {
+    private void saveState() {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream(10000);
+        try {
+            _soup.write(bos);
+            bos.close();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+
+        int newcount = _dao.getExecutionStateCounter() + 1;
+        _dao.setExecutionStateCounter(newcount);
+        _dao.setExecutionState(bos.toByteArray());
+        _instanceWorker.setCachedState(newcount, _soup);
+        
+        __log.debug("CACHE SAVE: #" + newcount + " for instance " + _dao.getInstanceId());
+    }
+
+    void injectMyRoleMessageExchange(final String responsechannel, final int idx, MessageExchangeDAO mexdao) {
         // if we have a message match, this instance should be marked
         // active if it isn't already
         if (_dao.getState() == ProcessState.STATE_READY) {
@@ -892,9 +807,9 @@
             sendEvent(evt);
         }
 
-        _outstandingRequests.associate(responsechannel, mex.getMessageExchangeId());
+        getORM().associate(responsechannel, mexdao.getMessageExchangeId());
 
-        final String mexId = mex.getMessageExchangeId();
+        final String mexId = mexdao.getMessageExchangeId();
         _vpu.inject(new JacobRunnable() {
             private static final long serialVersionUID = 3168964409165899533L;
 
@@ -905,15 +820,15 @@
         });
     }
 
-    void timerEvent(final String timerResponseChannel) {
+    boolean injectTimerEvent(final String timerResponseChannel) {
         // In case this is a pick event, we remove routes,
         // and cancel the outstanding requests.
         _dao.getProcess().removeRoutes(timerResponseChannel, _dao);
-        _outstandingRequests.cancel(timerResponseChannel);
+        getORM().cancel(timerResponseChannel);
 
         // Ignore timer events after the process is finished.
         if (ProcessState.isFinished(_dao.getState())) {
-            return;
+            return false;
         }
 
         _vpu.inject(new JacobRunnable() {
@@ -924,7 +839,8 @@
                 responseChannel.onTimeout();
             }
         });
-        execute();
+        
+        return true;
     }
 
     public void cancel(final TimerResponseChannel timerResponseChannel) {
@@ -932,7 +848,7 @@
         // receive/reply association.
         final String id = timerResponseChannel.export();
         _dao.getProcess().removeRoutes(id, _dao);
-        _outstandingRequests.cancel(id);
+        getORM().cancel(id);
 
         _vpu.inject(new JacobRunnable() {
             private static final long serialVersionUID = 6157913683737696396L;
@@ -944,32 +860,28 @@
         });
     }
 
-    void invocationResponse(PartnerRoleMessageExchangeImpl mex) {
-        invocationResponse(mex.getDAO().getMessageExchangeId(), mex.getDAO().getChannel());
-    }
-
-    void invocationResponse(final String mexid, final String responseChannelId) {
+    void injectPartnerResponse(final String mexid, final String responseChannelId) {
         if (responseChannelId == null)
             throw new NullPointerException("Null responseChannelId");
         if (mexid == null)
             throw new NullPointerException("Null mexId");
 
         if (BpelProcess.__log.isDebugEnabled()) {
-            __log.debug("Invoking message response for mexid " + mexid + " and channel " + responseChannelId);
+            __log.debug("<invoke> response for mexid " + mexid + " and channel " + responseChannelId);
         }
         _vpu.inject(new BpelJacobRunnable() {
             private static final long serialVersionUID = -1095444335740879981L;
 
             public void run() {
-                ((BpelRuntimeContextImpl) getBpelRuntimeContext()).invocationResponse2(mexid, importChannel(
-                        responseChannelId, InvokeResponseChannel.class));
+                ((BpelRuntimeContextImpl) getBpelRuntimeContext()).invocationResponse2(mexid, importChannel(responseChannelId,
+                        InvokeResponseChannel.class));
             }
         });
     }
 
     /**
      * Continuation of the above.
-     *
+     * 
      * @param mexid
      * @param responseChannel
      */
@@ -982,23 +894,23 @@
         evt.setMexId(mexid);
         evt.setOperation(mex.getOperation());
 
-        MessageExchange.Status status = MessageExchange.Status.valueOf(mex.getStatus());
+        MessageExchange.Status status = mex.getStatus();
 
-        switch (status) {
-            case FAULT:
-                evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAULT);
-                responseChannel.onFault();
-                break;
-            case RESPONSE:
-                evt.setAspect(ProcessMessageExchangeEvent.PARTNER_OUTPUT);
-                responseChannel.onResponse();
-                break;
-            case FAILURE:
-                evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAILURE);
-                responseChannel.onFailure();
-                break;
-            default:
-                __log.error("Invalid response state for mex " + mexid + ": " + status);
+        switch (mex.getAckType()) {
+        case FAULT:
+            evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAULT);
+            responseChannel.onFault();
+            break;
+        case RESPONSE:
+            evt.setAspect(ProcessMessageExchangeEvent.PARTNER_OUTPUT);
+            responseChannel.onResponse();
+            break;
+        case FAILURE:
+            evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAILURE);
+            responseChannel.onFailure();
+            break;
+        default:
+            __log.error("Invalid response state for mex " + mexid + ": " + status);
         }
         sendEvent(evt);
     }
@@ -1014,15 +926,14 @@
         _bpelProcess._debugger.onEvent(event);
 
         // notify the listeners
-        _bpelProcess._engine.fireEvent(event);
+        _bpelProcess._server.fireEvent(event);
 
         // saving
         _bpelProcess.saveEvent(event, _dao);
     }
 
     /**
-     * We record all values of properties of a 'MessageType' variable for
-     * efficient lookup.
+     * We record all values of properties of a 'MessageType' variable for efficient lookup.
      */
     private void writeProperties(VariableInstance variable, Node value, XmlDataDAO dao) {
         if (variable.declaration.type instanceof OMessageVarType) {
@@ -1030,8 +941,7 @@
                 OProcess.OPropertyAlias alias = property.getAlias(variable.declaration.type);
                 if (alias != null) {
                     try {
-                        String val = _bpelProcess.extractProperty((Element) value, alias, variable.declaration
-                                .getDescription());
+                        String val = _bpelProcess.extractProperty((Element) value, alias, variable.declaration.getDescription());
                         if (val != null) {
                             dao.setProperty(property.name.toString(), val);
                         }
@@ -1039,68 +949,50 @@
                         // This will fail as we're basically trying to extract properties on all
                         // received messages for optimization purposes.
                         if (__log.isDebugEnabled())
-                            __log.debug("Couldn't extract property '" + property.toString()
-                                    + "' in property pre-extraction: " + e.toString());
+                            __log.debug("Couldn't extract property '" + property.toString() + "' in property pre-extraction: "
+                                    + e.toString());
                     }
                 }
             }
         }
     }
 
-    private void completeOutstandingMessageExchanges() {
-        String[] mexRefs = _outstandingRequests.releaseAll();
+    /**
+     * Called when the process completes to clean up any outstanding message exchanges.
+     * 
+     */
+    private void cleanupOutstandingMyRoleExchanges(FaultData optionalFaultData) {
+        String[] mexRefs = getORM().releaseAll();
         for (String mexId : mexRefs) {
             MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
             if (mexDao != null) {
-                MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
-                switch (mex.getStatus()) {
-                    case ASYNC:
-                    case RESPONSE:
-                        mex.setStatus(MessageExchange.Status.COMPLETED_OK);
-                        break;
-                    case REQUEST:
-                        if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
-                            mex.setStatus(MessageExchange.Status.COMPLETED_OK);
-                            break;
-                        }
-                    default:
-                        mex.setFailure(FailureType.OTHER, "No response.", null);
-                        _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
-                        mex.release();
+                Status status = mexDao.getStatus();
+                MessageExchangePattern pattern = MessageExchange.MessageExchangePattern.valueOf(mexDao.getPattern());
+                InvocationStyle istyle = mexDao.getInvocationStyle();
+                if (pattern == MessageExchangePattern.REQUEST_ONLY) {
+                    mexDao.setAckType(AckType.ONEWAY);
+                    mexDao.setStatus(Status.COMPLETED);
+                    continue;
                 }
+
+                mexDao.setAckType(AckType.FAILURE);
+                mexDao.setFailureType(FailureType.NO_RESPONSE);
+                if (optionalFaultData != null) {
+                    mexDao.setFaultExplanation(optionalFaultData.toString());
+                }
+                mexDao.setFaultExplanation("Process completed without responding.");
+                mexDao.setStatus(Status.ACK);
+                _bpelProcess.onMyRoleMexAck(mexDao, status);
             }
         }
     }
 
-    private void faultOutstandingMessageExchanges(FaultData faultData) {
-        String[] mexRefs = _outstandingRequests.releaseAll();
-        for (String mexId : mexRefs) {
-            MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
-            if (mexDao != null) {
-                MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
-                _bpelProcess.initMyRoleMex(mex);
-
-                Message message = mex.createMessage(faultData.getFaultName());
-                if (faultData.getFaultMessage() != null)
-                    message.setMessage(faultData.getFaultMessage());
-                mex.setResponse(message);
-
-                mex.setFault(faultData.getFaultName(), message);
-                mex.setFaultExplanation(faultData.getExplanation());
-                _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
-            }
-        }
+    private OutstandingRequestManager getORM() {
+        return (OutstandingRequestManager) _soup.getGlobalData();
     }
 
-    private void failOutstandingMessageExchanges() {
-        String[] mexRefs = _outstandingRequests.releaseAll();
-        for (String mexId : mexRefs) {
-            MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
-            MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
-            _bpelProcess.initMyRoleMex(mex);
-            mex.setFailure(FailureType.OTHER, "No response.", null);
-            _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
-        }
+    private void cleanupOutstandingMyRoleExchanges() {
+        cleanupOutstandingMyRoleExchanges(null);
     }
 
     public Element getPartnerResponse(String mexId) {
@@ -1123,33 +1015,27 @@
             throw new BpelEngineException(msg);
         }
 
-        MessageExchange.Status status = MessageExchange.Status.valueOf(dao.getStatus());
-        switch (status) {
-            case ASYNC:
-            case REQUEST:
-                MessageDAO request = dao.getRequest();
-                if (request == null) {
-                    // this also should not happen
-                    String msg = "Engine requested request for message exchange that did not have one: " + mexId;
-                    __log.fatal(msg);
-                    throw new BpelEngineException(msg);
-                }
-
-                return request.getData();
-
-            default:
-                // We should not be in any other state when requesting this.
-                String msg = "Engine requested response while the message exchange " + mexId + " was in the state "
-                        + status;
-                __log.fatal(msg);
-                throw new BpelEngineException(msg);
+        MessageDAO request = dao.getRequest();
+        if (request == null) {
+            // this also should not happen
+            String msg = "Engine requested request for message exchange that did not have one: " + mexId;
+            __log.fatal(msg);
+            throw new BpelEngineException(msg);
         }
 
+        return request.getData();
+
     }
 
     public QName getPartnerFault(String mexId) {
-        MessageExchangeDAO mex = _getPartnerResponse(mexId).getMessageExchange();
-        return  mex.getFault();
+        MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId);
+        if (dao == null) {
+            // this should not happen....
+            String msg = "Engine requested non-existent message exchange: " + mexId;
+            __log.fatal(msg);
+            throw new BpelEngineException(msg);
+        }
+        return dao.getFault();
     }
 
     public QName getPartnerResponseType(String mexId) {
@@ -1177,24 +1063,20 @@
         }
 
         MessageDAO response;
-        MessageExchange.Status status = MessageExchange.Status.valueOf(dao.getStatus());
-        switch (status) {
-            case FAULT:
-            case RESPONSE:
-                response = dao.getResponse();
-                if (response == null) {
-                    // this also should not happen
-                    String msg = "Engine requested response for message exchange that did not have one: " + mexId;
-                    __log.fatal(msg);
-                    throw new BpelEngineException(msg);
-                }
-                break;
-            default:
-                // We should not be in any other state when requesting this.
-                String msg = "Engine requested response while the message exchange " + mexId + " was in the state "
-                        + status;
+        MessageExchange.Status status = dao.getStatus();
+        if (status == Status.ACK) {
+            response = dao.getResponse();
+            if (response == null) {
+                // this also should not happen
+                String msg = "Engine requested response for message exchange that did not have one: " + mexId;
                 __log.fatal(msg);
                 throw new BpelEngineException(msg);
+            }
+        } else {
+            // We should not be in any other state when requesting this.
+            String msg = "Engine requested response while the message exchange " + mexId + " was in the state " + status;
+            __log.fatal(msg);
+            throw new BpelEngineException(msg);
         }
         return response;
     }
@@ -1236,14 +1118,14 @@
         return dao.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
     }
 
-    public void registerActivityForRecovery(ActivityRecoveryChannel channel, long activityId, String reason,
-                                            Date dateTime, Element details, String[] actions, int retries) {
+    public void registerActivityForRecovery(ActivityRecoveryChannel channel, long activityId, String reason, Date dateTime,
+            Element details, String[] actions, int retries) {
         if (reason == null)
             reason = "Unspecified";
         if (dateTime == null)
             dateTime = new Date();
-        __log.info("ActivityRecovery: Registering activity " + activityId + ", failure reason: " + reason +
-                " on channel " + channel.export());
+        __log.info("ActivityRecovery: Registering activity " + activityId + ", failure reason: " + reason + " on channel "
+                + channel.export());
         _dao.createActivityRecovery(channel.export(), (int) activityId, reason, dateTime, details, actions, retries);
     }
 
@@ -1257,8 +1139,8 @@
 
             public void run() {
                 ActivityRecoveryChannel recovery = importChannel(channel, ActivityRecoveryChannel.class);
-                __log.info("ActivityRecovery: Recovering activity " + activityId + " with action " + action +
-                        " on channel " + recovery);
+                __log.info("ActivityRecovery: Recovering activity " + activityId + " with action " + action + " on channel "
+                        + recovery);
                 if (recovery != null) {
                     if ("cancel".equals(action))
                         recovery.cancel();
@@ -1269,7 +1151,7 @@
                 }
             }
         });
-        //_dao.deleteActivityRecovery(channel);
+        // _dao.deleteActivityRecovery(channel);
         execute();
     }
 
@@ -1291,48 +1173,5 @@
             __log.debug("initializing partner " + pLink + "  sessionId to " + session);
         fetchPartnerLinkDAO(pLink).setPartnerSessionId(session);
 
-    }
-
-    /**
-     * Attempt to match message exchanges on a correlator.
-     *
-     */
-    public void matcherEvent(String correlatorId, CorrelationKey ckey) {
-        if (BpelProcess.__log.isDebugEnabled()) {
-            __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckey=" + ckey);
-        }
-        CorrelatorDAO correlator = _dao.getProcess().getCorrelator(correlatorId);
-
-        // Find the route first, this is a SELECT FOR UPDATE on the "selector" row,
-        // So we want to acquire the lock before we do anthing else.
-        MessageRouteDAO mroute = correlator.findRoute(ckey);
-        if (mroute == null) {
-            // Ok, this means that a message arrived before we did, so nothing to do.
-            __log.debug("MatcherEvent handling: nothing to do, route no longer in DB");
-            return;
-        }
-
-        // Now see if there is a message that matches this selector.
-        MessageExchangeDAO mexdao = correlator.dequeueMessage(ckey);
-        if (mexdao != null) {
-            __log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)");
-
-            // We have a match, so we can get rid of the routing entries.
-            correlator.removeRoutes(mroute.getGroupId(),_dao);
-
-            // Found message matching one of our selectors.
-            if (BpelProcess.__log.isDebugEnabled()) {
-                BpelProcess.__log.debug("SELECT: " + mroute.getGroupId() + ": matched to MESSAGE " + mexdao
-                        + " on CKEY " + ckey);
-            }
-
-            MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexdao);
-
-            inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mex);
-            execute();
-        } else {
-            __log.debug("MatcherEvent handling: nothing to do, no matching message in DB");
-
-        }
     }
 }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Wed Sep  5 22:46:42 2007
@@ -19,33 +19,45 @@
 package org.apache.ode.bpel.engine;
 
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.transaction.TransactionManager;
 import javax.xml.namespace.QName;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.evt.BpelEvent;
 import org.apache.ode.bpel.iapi.BindingContext;
-import org.apache.ode.bpel.iapi.BpelEngine;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.BpelEventListener;
 import org.apache.ode.bpel.iapi.BpelServer;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReferenceContext;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.Message;
+import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.ProcessConf;
 import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
-import org.apache.ode.bpel.iapi.Scheduler.Synchronizer;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.o.OProcess;
 import org.apache.ode.utils.msg.MessageBundle;
@@ -56,45 +68,55 @@
  * <p>
  * The BPEL server implementation.
  * </p>
- *
+ * 
  * <p>
- * This implementation is intended to be thread safe. The key concurrency
- * mechanism is a "management" read/write lock that synchronizes all management
- * operations (they require "write" access) and prevents concurrent management
- * operations and processing (processing requires "read" access). Write access
- * to the lock is scoped to the method, while read access is scoped to a
+ * This implementation is intended to be thread safe. The key concurrency mechanism is a "management" read/write lock that
+ * synchronizes all management operations (they require "write" access) and prevents concurrent management operations and processing
+ * (processing requires "read" access). Write access to the lock is scoped to the method, while read access is scoped to a
  * transaction.
  * </p>
- *
+ * 
  * @author Maciej Szefler <mszefler at gmail dot com>
  * @author Matthieu Riou <mriou at apache dot org>
  */
 public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor {
 
     private static final Log __log = LogFactory.getLog(BpelServerImpl.class);
+
     private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
 
     /** Maximum age of a process before it is quiesced */
     private static Long __processMaxAge;
 
-    /** 
-     * Set of processes that are registered with the server. Includes hydrated and dehydrated processes.
-     * Guarded by _mngmtLock.writeLock(). 
+    /** RNG, for delays */
+    private Random _random = new Random(System.currentTimeMillis());
+
+    private static double _delayMean = 0;
+
+    /**
+     * Set of processes that are registered with the server. Includes hydrated and dehydrated processes. Guarded by
+     * _mngmtLock.writeLock().
      */
-    private final Set<BpelProcess> _registeredProcesses = new HashSet<BpelProcess>();
+    private final HashMap<QName, BpelProcess> _registeredProcesses = new HashMap<QName, BpelProcess>();
+
+    /** Mapping from myrole service name to active process. */
+    private final HashMap<QName, BpelProcess> _serviceMap = new HashMap<QName, BpelProcess>();
 
     private State _state = State.SHUTDOWN;
-    private Contexts _contexts = new Contexts();
+
+    Contexts _contexts = new Contexts();
+
     private DehydrationPolicy _dehydrationPolicy;
+
     private Properties _configProperties;
-    
-    BpelEngineImpl _engine;
+
+    private ExecutorService _exec;
+
     BpelDatabase _db;
 
     /**
-     * Management lock for synchronizing management operations and preventing
-     * processing (transactions) from occuring while management operations are
-     * in progress.
+     * Management lock for synchronizing management operations and preventing processing (transactions) from occuring while
+     * management operations are in progress.
      */
     private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
 
@@ -121,7 +143,7 @@
 
     public BpelServerImpl() {
     }
-    
+
     public void start() {
         _mngmtLock.writeLock().lock();
         try {
@@ -132,46 +154,65 @@
 
             __log.debug("BPEL SERVER starting.");
 
+
+            if (_exec == null)
+                _exec = Executors.newCachedThreadPool();
+
+            if (_contexts.txManager == null) {
+                String errmsg = "Transaction manager not specified; call setTransactionManager(...)!";
+                __log.fatal(errmsg);
+                throw new IllegalStateException(errmsg);
+            }
+            
+            if (_contexts.scheduler == null) { 
+                String errmsg = "Scheduler not specified; call setScheduler(...)!";
+                __log.fatal(errmsg);
+                throw new IllegalStateException(errmsg);
+            }
+            
             _contexts.scheduler.start();
             _state = State.RUNNING;
             __log.info(__msgs.msgServerStarted());
-            if (_dehydrationPolicy != null) new Thread(new ProcessDefReaper()).start();
+            if (_dehydrationPolicy != null)
+                new Thread(new ProcessDefReaper()).start();
         } finally {
             _mngmtLock.writeLock().unlock();
         }
     }
 
     /**
-     * Register a global listener to receive {@link BpelEvent}s froom all
-     * processes.
+     * Register a global listener to receive {@link BpelEvent}s froom all processes.
+     * 
      * @param listener
      */
     public void registerBpelEventListener(BpelEventListener listener) {
+        listener.startup(_configProperties);
+
         // Do not synchronize, eventListeners is copy-on-write array.
-    	listener.startup(_configProperties);
-    	_contexts.eventListeners.add(listener);
+        _contexts.eventListeners.add(listener);
     }
 
     /**
-     * Unregister a global listener from receive {@link BpelEvent}s from all
-     * processes.
+     * Unregister a global listener from receive {@link BpelEvent}s from all processes.
+     * 
      * @param listener
      */
     public void unregisterBpelEventListener(BpelEventListener listener) {
         // Do not synchronize, eventListeners is copy-on-write array.
-    	try {
-    		listener.shutdown();
-    	} catch (Exception e) {
-    		__log.warn("Stopping BPEL event listener " + listener.getClass().getName() + " failed, nevertheless it has been unregistered.", e);
-    	} finally {
-    		_contexts.eventListeners.remove(listener);
-    	}
+        if (_contexts.eventListeners.remove(listener)) {
+            try {
+                listener.shutdown();
+            } catch (Exception e) {
+                __log.warn("Stopping BPEL event listener " + listener.getClass().getName()
+                        + " failed, nevertheless it has been unregistered.", e);
+            }
+        }
     }
-    
+
     private void unregisterBpelEventListeners() {
-    	for (BpelEventListener l : _contexts.eventListeners) {
-    		unregisterBpelEventListener(l);
-    	}
+        for (BpelEventListener l : _contexts.eventListeners) {
+            unregisterBpelEventListener(l);
+        }
     }
 
     public void stop() {
@@ -185,7 +226,6 @@
             __log.debug("BPEL SERVER STOPPING");
 
             _contexts.scheduler.stop();
-            _engine = null;
             _state = State.INIT;
             __log.info(__msgs.msgServerStopped());
         } finally {
@@ -201,10 +241,8 @@
 
             __log.debug("BPEL SERVER initializing ");
 
-            _db = new BpelDatabase(_contexts.dao, _contexts.scheduler);
+            _db = new BpelDatabase(_contexts);
             _state = State.INIT;
-            
-            _engine = new BpelEngineImpl(_contexts);
 
         } finally {
             _mngmtLock.writeLock().unlock();
@@ -218,7 +256,6 @@
             unregisterBpelEventListeners();
 
             _db = null;
-            _engine = null;
             _state = State.SHUTDOWN;
         } finally {
             _mngmtLock.writeLock().unlock();
@@ -226,26 +263,6 @@
 
     }
 
-    public BpelEngine getEngine() {
-        boolean registered = false;
-        _mngmtLock.readLock().lock();
-        try {
-            _contexts.scheduler.registerSynchronizer(new Synchronizer() {
-                public void afterCompletion(boolean success) {
-                    _mngmtLock.readLock().unlock();
-                }
-                public void beforeCompletion() {
-                }
-            });
-            registered = true;
-        } finally {
-            // If we failed to register the synchro,then there was an ex/throwable; we need to unlock now.
-            if (!registered)
-                _mngmtLock.readLock().unlock();
-        }
-        return _engine;
-    }
-
     public void register(ProcessConf conf) {
         if (conf == null)
             throw new NullPointerException("must specify non-null process configuration.");
@@ -263,17 +280,23 @@
 
         try {
             // If the process is already active, do nothing.
-            if (_engine.isProcessRegistered(conf.getProcessId())) {
+            if (_registeredProcesses.containsKey(conf.getProcessId())) {
                 __log.debug("skipping doRegister" + conf.getProcessId() + ") -- process is already registered");
                 return;
             }
 
             __log.debug("Registering process " + conf.getProcessId() + " with server.");
 
-            BpelProcess process = new BpelProcess(conf);
+            BpelProcess process = new BpelProcess(this, conf, null);
+
+            for (Endpoint e : process.getServiceNames()) {
+                __log.debug("Register process: serviceId=" + e + ", process=" + process);
+                _serviceMap.put(e.serviceName, process);
+            }
 
-            _engine.registerProcess(process);
-            _registeredProcesses.add(process);
+            process.activate(_contexts);
+
+            _registeredProcesses.put(process.getPID(), process);
             process.hydrate();
 
             __log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
@@ -294,11 +317,13 @@
         }
 
         try {
-            BpelProcess p = null;
-            if (_engine != null) {
-                _engine.unregisterProcess(pid);
-                _registeredProcesses.remove(p);
-            }
+            BpelProcess p = _registeredProcesses.remove(pid);
+            if (p == null)
+                return;
+            
+            p.deactivate();
+            while (_serviceMap.values().remove(p))
+                ;
 
             __log.info(__msgs.msgProcessUnregistered(pid));
 
@@ -312,7 +337,9 @@
 
     /**
      * Register a global message exchange interceptor.
-     * @param interceptor message-exchange interceptor
+     * 
+     * @param interceptor
+     *            message-exchange interceptor
      */
     public void registerMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
         // NOTE: do not synchronize, globalInterceptors is copy-on-write.
@@ -321,7 +348,9 @@
 
     /**
      * Unregister a global message exchange interceptor.
-     * @param interceptor message-exchange interceptor
+     * 
+     * @param interceptor
+     *            message-exchange interceptor
      */
     public void unregisterMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
         // NOTE: do not synchronize, globalInterceptors is copy-on-write.
@@ -329,6 +358,28 @@
     }
 
     /**
+     * Route to a process using the service id. Note, that we do not need the endpoint name here, we are assuming that two processes
+     * would not be registered under the same service qname but different endpoint.
+     * 
+     * @param service
+     *            target service id
+     * @param request
+     *            request message
+     * @return process corresponding to the targetted service, or <code>null</code> if service identifier is not recognized.
+     */
+    BpelProcess route(QName service, Message request) {
+        // TODO: use the message to route to the correct service if more than
+        // one service is listening on the same endpoint.
+
+        _mngmtLock.readLock().lock();
+        try {
+            return _serviceMap.get(service);
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+    }
+
+    /**
      * Check a state transition from state "i" to state "j".
      */
     private final boolean checkState(State i, State j) {
@@ -360,10 +411,273 @@
         }
     }
 
-    public void onScheduledJob(JobInfo jobInfo) throws JobProcessorException {
-        getEngine().onScheduledJob(jobInfo);
+    public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
+        _mngmtLock.readLock().lock();
+        try {
+            final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
+            BpelProcess process = _registeredProcesses.get(we.getProcessId());
+            if (process == null) {
+                // If the process is not active, it means that we should not be
+                // doing any work on its behalf, therefore we will reschedule the
+                // events for some time in the future (1 minute).
+                _contexts.execTransaction(new Callable<Void>() {
+
+                    public Void call() throws Exception {
+                        _contexts.scheduler.jobCompleted(jobInfo.jobName);
+                        Date future = new Date(System.currentTimeMillis() + (60 * 1000));
+                        __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future));
+                        _contexts.scheduler.schedulePersistedJob(we.getDetail(), future);            
+                        return null;
+                    }
+                    
+                });
+                return;
+            }
+
+            process.handleWorkEvent(jobInfo);
+        } catch (Exception ex) {
+            throw new JobProcessorException(ex, true);
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+    }
+
+    public void setTransactionManager(TransactionManager txm) {
+        _contexts.txManager = txm;
+    }
+    
+    public void setDehydrationPolicy(DehydrationPolicy dehydrationPolicy) {
+        _dehydrationPolicy = dehydrationPolicy;
+    }
+
+    public void setConfigProperties(Properties configProperties) {
+        _configProperties = configProperties;
+    }
+
+    public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
+        _contexts.mexContext = mexContext;
+    }
+
+    public void setScheduler(Scheduler scheduler) throws BpelEngineException {
+        _contexts.scheduler = scheduler;
+    }
+
+    public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
+        _contexts.eprContext = eprContext;
+    }
+
+    /**
+     * Set the DAO connection factory. The DAO is used by the BPEL engine to persist information about active processes.
+     * 
+     * @param daoCF
+     *            {@link BpelDAOConnectionFactory} implementation.
+     */
+    public void setDaoConnectionFactory(BpelDAOConnectionFactory daoCF) throws BpelEngineException {
+        _contexts.dao = daoCF;
+    }
+
+    public void setBindingContext(BindingContext bc) {
+        _contexts.bindingContext = bc;
+    }
+
+    public MyRoleMessageExchange createMessageExchange(final InvocationStyle istyle, final QName targetService,
+            final String operation, final String clientKey) throws BpelEngineException {
+
+        _mngmtLock.readLock().lock();
+        try {
+            final BpelProcess target = route(targetService, null);
+
+            if (target == null)
+                throw new BpelEngineException("NoSuchService: " + targetService);
+
+            if (istyle == InvocationStyle.RELIABLE || istyle == InvocationStyle.TRANSACTED)
+                assertTransaction();
+            else
+                assertNoTransaction();
+            
+            
+            return target.createNewMyRoleMex(istyle, targetService, operation, clientKey);
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+    }
+    
+    public MessageExchange getMessageExchange(final String mexId) throws BpelEngineException {
+
+        _mngmtLock.readLock().lock();
+        try {
+            final MessageExchangeDAO inmemdao = getInMemMexDAO(mexId);
+
+            Callable<MessageExchange> loadMex = new Callable<MessageExchange>() {
+
+                public MessageExchange call() {
+                    MessageExchangeDAO mexdao = (inmemdao == null) ? mexdao = _contexts.dao.getConnection().getMessageExchange(
+                            mexId) : inmemdao;
+                    if (mexdao == null)
+                        return null;
+
+                    ProcessDAO pdao = mexdao.getProcess();
+                    BpelProcess process = pdao == null ? null : _registeredProcesses.get(pdao.getProcessId());
+
+                    if (process == null) {
+                        String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
+                        __log.error(errmsg);
+                        // TODO: Perhaps we should define a checked exception for this
+                        // condition.
+                        throw new BpelEngineException(errmsg);
+                    }
+
+                    InvocationStyle istyle = mexdao.getInvocationStyle();
+                    if (istyle == InvocationStyle.RELIABLE || istyle == InvocationStyle.TRANSACTED)
+                        assertTransaction();
+
+                    switch (mexdao.getDirection()) {
+                    case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
+                        return process.createPartnerRoleMex(mexdao);
+                    case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
+                        return process.lookupMyRoleMex(mexdao);
+                    default:
+                        String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
+                        __log.fatal(errmsg);
+                        throw new BpelEngineException(errmsg);
+                    }
+                }
+            };
+
+            try {
+                if (inmemdao != null || _contexts.isTransacted()) 
+                    return loadMex.call();
+                else 
+                    return enqueueTransaction(loadMex).get();
+            } catch (ContextException e) {
+                throw new BpelEngineException(e);
+            } catch (Exception e) {
+                throw new BpelEngineException(e);
+            }
+
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+
+    }
+
+    public MessageExchange getMessageExchangeByForeignKey(String foreignKey) throws BpelEngineException {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public Set<InvocationStyle> getSupportedInvocationStyle(QName serviceId) {
+
+        _mngmtLock.readLock().lock();
+        try {
+            BpelProcess process = _serviceMap.get(serviceId);
+            if (process == null)
+                throw new BpelEngineException("No such service: " + serviceId);
+            return process.getSupportedInvocationStyle(serviceId);
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+    }
+
+    MessageExchangeDAO getInMemMexDAO(String mexId) {
+        _mngmtLock.readLock().lock();
+        try {
+          for (BpelProcess p : _registeredProcesses.values()) {
+              MessageExchangeDAO mexDao = p.getInMemMexDAO(mexId);
+              if (mexDao != null)
+                  return mexDao;
+          }
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+        
+        return null;
+    }
+    
+    OProcess getOProcess(QName processId) {
+        _mngmtLock.readLock().lock();
+        try {
+            BpelProcess process = _registeredProcesses.get(processId);
+
+            if (process == null)
+                return null;
+
+            return process.getOProcess();
+
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
+    }
+
+
+    <T> Future<T> enqueueTransaction(final Callable<T> transaction) throws ContextException {
+        return _exec.submit(new ServerCallable<T>(new TransactedCallable<T>(transaction)));
+    }
+
+    void enqueueRunnable(final Runnable runnable) {
+        _exec.submit(new ServerRunnable(runnable));
+    }
+    
+    /**
+     * Schedule a {@link Runnable} object for execution after the completion of the current transaction. 
+     * @param runnable
+     */
+    void scheduleRunnable(final Runnable runnable) {
+        assertTransaction();
+        _contexts.registerCommitSynchronizer(new Runnable() {
+
+            public void run() {
+                _exec.submit(new ServerRunnable(runnable));
+            }
+            
+        });
+        
     }
+
     
+    protected void assertTransaction() {
+        if (!_contexts.isTransacted())
+            throw new BpelEngineException("Operation must be performed in a transaction!");
+    }
+
+    protected void assertNoTransaction() {
+        if (_contexts.isTransacted())
+            throw new BpelEngineException("Operation must be performed outside of a transaction!");
+    }
+
+    void fireEvent(BpelEvent event) {
+        // Note that the eventListeners list is a copy-on-write array, so need
+        // to mess with synchronization.
+        for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) {
+            l.onEvent(event);
+        }
+    }
+
+    /**
+     * Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially
+     * distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable.
+     */
+    private void debuggingDelay() {
+        // Do a delay for debugging purposes.
+        if (_delayMean != 0)
+            try {
+                long delay = randomExp(_delayMean);
+                // distribution
+                // with mean
+                // _delayMean
+                __log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms.");
+                Thread.sleep(delay);
+            } catch (InterruptedException e) {
+                ; // ignore
+            }
+    }
+
+    private long randomExp(double mean) {
+        double u = _random.nextDouble(); // Uniform
+        long delay = (long) (-Math.log(u) * mean); // Exponential
+        return delay;
+    }
+
     private class ProcessDefReaper implements Runnable {
         public void run() {
             __log.debug("Starting process definition reaper thread.");
@@ -372,16 +686,16 @@
                 while (true) {
                     Thread.sleep(pollingTime);
                     _mngmtLock.writeLock().lockInterruptibly();
-                    try { 
+                    try {
                         __log.debug("Kicking reaper, OProcess instances: " + OProcess.instanceCount);
-                        // Copying the runnning process list to avoid synchronization
+                        // Copying the runnning process list to avoid synchronizatMessageExchangeInterion
                         // problems and a potential mess if a policy modifies the list
-                        List<BpelProcess> candidates = new ArrayList<BpelProcess>(_registeredProcesses);
+                        List<BpelProcess> candidates = new ArrayList<BpelProcess>(_registeredProcesses.values());
                         CollectionsX.remove_if(candidates, new MemberOfFunction<BpelProcess>() {
                             public boolean isMember(BpelProcess o) {
                                 return !o.hintIsHydrated();
                             }
-                            
+
                         });
 
                         // And the happy winners are...
@@ -401,43 +715,81 @@
         }
     }
 
-    public void setDehydrationPolicy(DehydrationPolicy dehydrationPolicy) {
-        _dehydrationPolicy = dehydrationPolicy;
+    public BpelProcess getBpelProcess(QName processId) {
+        _mngmtLock.readLock().lock();
+        try {
+            return _registeredProcesses.get(processId);
+        } finally {
+            _mngmtLock.readLock().unlock();
+        }
     }
 
-    public void setConfigProperties(Properties configProperties) {
-    	_configProperties = configProperties;
+    
+   
+    
+    class ServerRunnable implements Runnable {
+        final Runnable _work;
+        ServerRunnable(Runnable work) {
+            _work = work;
+        }
+        
+        public void run() {
+            _mngmtLock.readLock().lock();
+            try {
+                _work.run();
+            } catch (Throwable ex) {
+                __log.fatal("Internal Error", ex);
+            } finally {
+                _mngmtLock.readLock().unlock();
+            }
+        }
+        
     }
     
-    public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
-        _contexts.mexContext = mexContext;
+   
+    
+    class ServerCallable<T> implements Callable<T>{
+        final Callable<T> _work;
+        ServerCallable(Callable<T> work) {
+            _work = work;
+        }
+        
+        public T call () throws Exception {
+            _mngmtLock.readLock().lock();
+            try {
+                return _work.call();
+            } catch (Exception ex) {
+                __log.fatal("Internal Error", ex);
+                throw ex;
+            } finally {
+                _mngmtLock.readLock().unlock();
+            }
+        }
+        
     }
 
-    public void setScheduler(Scheduler scheduler) throws BpelEngineException {
-        _contexts.scheduler = scheduler;
-    }
+    class TransactedCallable<T> implements Callable<T> {
+        Callable<T> _work;
 
-    public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
-        _contexts.eprContext = eprContext;
-    }
+        TransactedCallable(Callable<T> work) {
+            _work = work;
+        }
 
-    /**
-     * Set the DAO connection factory. The DAO is used by the BPEL engine to
-     * persist information about active processes.
-     *
-     * @param daoCF
-     *            {@link BpelDAOConnectionFactory} implementation.
-     */
-    public void setDaoConnectionFactory(BpelDAOConnectionFactory daoCF) throws BpelEngineException {
-        _contexts.dao = daoCF;
+        public T call() throws Exception {
+            return _contexts.execTransaction(_work);
+        }
     }
 
-    public void setInMemDaoConnectionFactory(BpelDAOConnectionFactory daoCF) {
-        _contexts.inMemDao = daoCF;
-    }
 
-    public void setBindingContext(BindingContext bc) {
-        _contexts.bindingContext = bc;
-    }
+    class TransactedRunnable implements Runnable {
+        Runnable _work;
+
+        TransactedRunnable(Runnable work) {
+            _work = work;
+        }
 
+        public void run() {
+            _contexts.execTransaction(_work);
+        }
+    }
 }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java Wed Sep  5 22:46:42 2007
@@ -19,22 +19,34 @@
 
 package org.apache.ode.bpel.engine;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.iapi.BindingContext;
+import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.BpelEventListener;
+import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.EndpointReferenceContext;
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+
 /**
- * Aggregation of all the contexts provided to the BPEL engine by the
- * integration layer.
+ * Aggregation of all the contexts provided to the BPEL engine by the integration layer.
  */
 class Contexts {
+    private static final Log __log = LogFactory.getLog(Contexts.class);
+    
+    TransactionManager txManager;
 
     MessageExchangeContext mexContext;
 
@@ -45,13 +57,85 @@
     BindingContext bindingContext;
 
     BpelDAOConnectionFactory dao;
-    BpelDAOConnectionFactory inMemDao;
 
-    /** Global Message-Exchange interceptors. Must be copy-on-write!!! */ 
-    final List<MessageExchangeInterceptor >globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>();
+    /** Global Message-Exchange interceptors. Must be copy-on-write!!! */
+    final List<MessageExchangeInterceptor> globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>();
 
     /** Global event listeners. Must be copy-on-write!!! */
     final List<BpelEventListener> eventListeners = new CopyOnWriteArrayList<BpelEventListener>();
 
+    public boolean isTransacted() {
+        try {
+            return txManager.getStatus() == Status.STATUS_ACTIVE;
+        } catch (SystemException e) {
+            throw new BpelEngineException(e);
+        }
+    }
+
+    public void execTransaction(final Runnable transaction) {
+        try {
+            execTransaction(new Callable<Void>() {
+
+                public Void call() throws Exception {
+                    transaction.run();
+                    return null;
+                }
+
+            });
+        } catch (Exception e) {
+            throw new BpelEngineException(e);
+        }
+
+    }
+
+    public <T> T execTransaction(Callable<T> transaction) throws Exception{
+        try {
+            txManager.begin();
+        } catch (Exception ex) {
+            String errmsg = "Internal Error, could not begin transaction.";
+            throw new BpelEngineException(errmsg, ex);
+        }
+        boolean success = false;
+        try {
+            T retval = transaction.call();
+            success = true;
+            return retval;
+        } catch (Exception ex) {
+            throw ex;
+        } finally {
+            if (success)
+                try {
+                    txManager.commit();
+                } catch (Exception ex) {
+                    __log.error("Commit failed.", ex);                    
+                    throw new BpelEngineException("Commit failed.", ex);
+                }
+            else
+                try {
+                    txManager.rollback();
+                } catch (Exception ex) {
+                    __log.error("Transaction rollback failed.", ex);
+                }
+        }
+    }
+
+    public void registerCommitSynchronizer(final Runnable runnable) {
+        try {
+            txManager.getTransaction().registerSynchronization(new Synchronization() {
+
+                public void afterCompletion(int status) {
+                    if (status == Status.STATUS_COMMITTED)
+                        runnable.run();
+                }
+
+                public void beforeCompletion() {
+
+                }
+                
+            });
+        } catch (Exception ex) {
+            throw new BpelEngineException("Error registering synchronizer." ,ex);
+        }
+    }
 
 }