You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/07/24 22:58:14 UTC

svn commit: r559204 [2/3] - in /incubator/ode/branches/bart: ./ axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-epr/src/main/java/org/apache/ode/il/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ ...

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Tue Jul 24 13:58:12 2007
@@ -18,6 +18,19 @@
  */
 package org.apache.ode.bpel.engine;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import javax.wsdl.Operation;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.common.CorrelationKey;
@@ -44,21 +57,17 @@
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.memdao.ProcessInstanceDaoImpl;
 import org.apache.ode.bpel.o.OMessageVarType;
-import org.apache.ode.bpel.o.OMessageVarType.Part;
 import org.apache.ode.bpel.o.OPartnerLink;
 import org.apache.ode.bpel.o.OProcess;
 import org.apache.ode.bpel.o.OScope;
+import org.apache.ode.bpel.o.OMessageVarType.Part;
 import org.apache.ode.bpel.runtime.BpelJacobRunnable;
 import org.apache.ode.bpel.runtime.BpelRuntimeContext;
 import org.apache.ode.bpel.runtime.CorrelationSetInstance;
@@ -80,21 +89,9 @@
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.Namespaces;
 import org.apache.ode.utils.ObjectPrinter;
-import org.omg.CORBA._PolicyStub;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
-import javax.wsdl.Operation;
-import javax.xml.namespace.QName;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
 /**
  * 
  * 
@@ -118,15 +115,17 @@
 
     private MessageExchangeDAO _instantiatingMessageExchange;
 
-    /** Object for keeping track of all the outstanding <invoke>s. */
+    /** Object for keeping track of all the outstanding <pick>/<receive> activities */
     private OutstandingRequestManager _outstandingRequests;
 
-    /** List of BLOCKING invocations that need to be deferred until the end of the current TX */
-    private List<PartnerRoleMessageExchange> _todoBlockingCalls = new LinkedList<PartnerRoleMessageExchange>();
+    /** List of pending invocations that need to be deferred until the end of the current TX */
+    private List<PartnerRoleMessageExchangeImpl> _pendingPartnerRoleInvokes = new LinkedList<PartnerRoleMessageExchangeImpl>();
 
-    /** List of ASYNC invocations that need to be deferred until the end of the current TX. */
-    private List<PartnerRoleMessageExchange> _todoAsyncCalls = new LinkedList<PartnerRoleMessageExchange>();
+    /** List of pending ASYNC responses that need to be deferred until the end of the current TX. */
+    private List<MyRoleMessageExchangeImpl> _pendingMyRoleReplies = new LinkedList<MyRoleMessageExchangeImpl>();
 
+    private BpelInstanceWorker _instanceWorker;
+    
     private BpelProcess _bpelProcess;
 
     /** Five second maximum for continous execution. */
@@ -134,10 +133,13 @@
 
     private Contexts _contexts;
 
-    public BpelRuntimeContextImpl(BpelProcess bpelProcess, ProcessInstanceDAO dao, PROCESS PROCESS,
+    private boolean _executed;
+
+    public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
             MessageExchangeDAO instantiatingMessageExchange) {
-        _bpelProcess = bpelProcess;
-        _contexts = bpelProcess._contexts;
+        _instanceWorker = instanceWorker;
+        _bpelProcess = instanceWorker._process;
+        _contexts = instanceWorker._contexts;
         _dao = dao;
         _iid = dao.getInstanceId();
         _instantiatingMessageExchange = instantiatingMessageExchange;
@@ -149,24 +151,16 @@
         _outstandingRequests = new OutstandingRequestManager();
         _vpu.setContext(_soup);
 
-        if (bpelProcess.isInMemory()) {
-            ProcessInstanceDaoImpl inmem = (ProcessInstanceDaoImpl) _dao;
-            if (inmem.getSoup() != null) {
-                _soup = (ExecutionQueueImpl) inmem.getSoup();
-                _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
-                _vpu.setContext(_soup);
-            }
-        } else {
-            byte[] daoState = dao.getExecutionState();
-            if (daoState != null) {
-                ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
-                try {
-                    _soup.read(iis);
-                } catch (Exception ex) {
-                    throw new RuntimeException(ex);
-                }
-                _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
+        byte[] daoState = dao.getExecutionState();
+        if (daoState != null) {
+            assert !_bpelProcess.isInMemory() : "did not expect to rehydrate in-mem process!";
+            ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
+            try {
+                _soup.read(iis);
+            } catch (Exception ex) {
+                throw new RuntimeException(ex);
             }
+            _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
         }
 
         if (PROCESS != null) {
@@ -232,7 +226,7 @@
         sendEvent(new ProcessCompletionEvent(faultData.getFaultName()));
         _dao.finishCompletion();
 
-        faultOutstandingMessageExchanges(faultData);
+        cleanupOutstandingMyRoleExchanges(faultData);
     }
 
     /**
@@ -253,7 +247,7 @@
         sendEvent(new ProcessCompletionEvent(null));
         _dao.finishCompletion();
 
-        completeOutstandingMessageExchanges();
+        cleanupOutstandingMyRoleExchanges();
     }
 
     /**
@@ -313,7 +307,7 @@
             evt.setNewState(ProcessState.STATE_READY);
             sendEvent(evt);
         } else if (_bpelProcess.isInMemory()) {
-            // This condition should be detected with static analysis, but just in case. 
+            // This condition should be detected with static analysis, but just in case.
             throw new InvalidProcessException("In-memory process must not receive additional messages.");
         }
 
@@ -558,11 +552,11 @@
             InvocationStyle istyle = InvocationStyle.valueOf(myrolemex.getInvocationStyle());
             switch (istyle) {
             case RELIABLE:
-                scheduleReliableResponse(myrolemex.getMessageExchangeId());
-                break;                
+                scheduleReliableResponse(myrolemex);
+                break;
             case ASYNC:
-               scheduleAsyncResponse(myrolemex.getMessageExchangeId());
-               break;
+                scheduleAsyncResponse(myrolemex);
+                break;
             default:
                 // DO NOTHING
                 break;
@@ -572,6 +566,7 @@
 
         sendEvent(evt);
     }
+
     /**
      * @see BpelRuntimeContext#writeCorrelation(org.apache.ode.bpel.runtime.CorrelationSetInstance,
      *      org.apache.ode.bpel.common.CorrelationKey)
@@ -646,7 +641,7 @@
         sendEvent(evt);
         sendEvent(new ProcessTerminationEvent());
 
-        failOutstandingMessageExchanges();
+        cleanupOutstandingMyRoleExchanges();
     }
 
     public void registerTimer(TimerResponseChannel timerChannel, Date timeToFire) {
@@ -663,7 +658,7 @@
         we.setType(WorkEvent.Type.MATCHER);
         we.setCorrelatorId(correlatorId);
         we.setCorrelationKey(key);
-        _contexts.scheduler.scheduleVolatileJob(true, we.getDetail());
+        _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
     }
 
     public String invoke(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
@@ -765,46 +760,59 @@
      */
     private void invokeIL(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
             PartnerRoleChannel partnerRoleChannel, EndpointReference partnerEpr, MessageExchangeDAO mexDao) {
-        if (partnerEpr != null) {
-            // If we couldn't find the endpoint, then there is no sense
-            // in asking the IL to invoke.
-            mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
-            mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
-            Set<InvocationStyle> supportedStyles = _contexts.mexContext.getSupportedInvocationStyle(partnerRoleChannel, partnerEpr);
-            if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
-                // If RELIABLE is supported, this is easy, we just do it in-line.
-                throw new UnsupportedOperationException(); // TODO
-                ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl();
-                _contexts.mexContext.invokePartnerReliable(reliableMex);
-            } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
-                // If TRANSACTED is supported, this is again easy, do it in-line.
-                throw new UnsupportedOperationException(); // TODO
-                TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl();
-                _contexts.mexContext.invokePartnerTransacted(transactedMex);
-            } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
-                // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
-                BlockingPartnerRoleMessageExchangeImpl blockingMex = new BlockingPartnerRoleMessageExchangeImpl();
-                _todoBlockingCalls.add(blockingMex);
-            } else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
-                // For ASYNC style, we defer the call until after commit (unless idempotent).
-                AsyncPartnerRoleMessageExchangeImpl asyncMex = new AsyncPartnerRoleMessageExchangeImpl();
-                _todoAsyncCalls.add(asyncMex);
-            } else {
-                // This really should not happen, indicates IL is screwy.
-                __log.error("Integration Layer did not agree to any known invocation style for EPR "
-                        + DOMUtils.domToString(partnerEPR));
-                mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
-                mexDao.setStatus(Status.FAILURE.toString());
-                mexDao.setFaultExplanation("NoMatchingStyle");
-            }
-
-        } else {
-            __log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR));
+        // If we couldn't find the endpoint, then there is no sense
+        // in asking the IL to invoke.
+        if (partnerEpr == null) {
+            __log.error("Couldn't find endpoint for partner EPR ");
             mexDao.setFailureType(FailureType.UNKNOWN_ENDPOINT.toString());
             mexDao.setFaultExplanation("UnknownEndpoint");
             mexDao.setStatus(Status.FAILURE.toString());
-            // , partnerEPR);
+            return;
         }
+
+        PortType portType = partnerLink.partnerLink.partnerRolePortType;
+        EndpointReference myRoleEpr = null; // TODO: how did we get this?
+
+        mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
+        mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
+        Set<InvocationStyle> supportedStyles = _contexts.mexContext.getSupportedInvocationStyle(partnerRoleChannel, partnerEpr);
+        if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
+            // If RELIABLE is supported, this is easy, we just do it in-line.
+            ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
+                    .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+            _contexts.mexContext.invokePartnerReliable(reliableMex);
+        } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+            // If TRANSACTED is supported, this is again easy, do it in-line.
+            TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
+                    mexDao.getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+            _contexts.mexContext.invokePartnerTransacted(transactedMex);
+        } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
+            // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
+            BlockingPartnerRoleMessageExchangeImpl blockingMex = new BlockingPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
+                    .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+            schedule(new BlockingInvoker(blockingMex));
+        } else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
+            // For ASYNC style, we defer the call until after commit (unless idempotent).
+            AsyncPartnerRoleMessageExchangeImpl asyncMex = new AsyncPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
+                    .getMessageExchangeId(), portType, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
+            schedule(new AsyncInvoker(asyncMex));
+            
+        } else {
+            // This really should not happen, indicates IL is screwy.
+            __log.error("Integration Layer did not agree to any known invocation style for EPR " + partnerEpr);
+            mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
+            mexDao.setStatus(Status.FAILURE.toString());
+            mexDao.setFaultExplanation("NoMatchingStyle");
+        }
+
+    }
+
+    private void schedule(final Runnable runnable) {
+        _contexts.registerCommitSynchronizer(new Runnable() {
+            public void run() {
+                _instanceWorker.enqueue(runnable);
+            }
+        });
     }
 
     /**
@@ -821,6 +829,31 @@
             __log.debug("Invoking in a p2p interaction, partnerrole " + partnerRoleMex.getMessageExchangeId());
         }
 
+        // following code is used to determine the style of invocation. note that for p2p, this is a bit of an
+        // "approximation", since we are using non-public mechanisms to control the child process. In any case
+        // the "in-memory" status of the caller and callee play an important role in determining the style.
+        InvocationStyle style;
+        if (_bpelProcess.isInMemory()) {
+            if (operation.getOutput() == null)
+                style = InvocationStyle.RELIABLE;
+            else if (target.isInMemory())
+                style = InvocationStyle.TRANSACTED;
+            else if (target.getSupportedInvocationStyle(serviceName).contains(InvocationStyle.TRANSACTED))
+                style = InvocationStyle.TRANSACTED;
+            else
+                style = InvocationStyle.BLOCKING;
+        } else /* persisted */{
+
+            if (operation.getOutput() != null
+                    && target.getSupportedInvocationStyle(serviceName).contains(InvocationStyle.TRANSACTED))
+                style = InvocationStyle.TRANSACTED;
+            else
+                style = InvocationStyle.RELIABLE;
+
+        }
+
+        partnerRoleMex.setInvocationStyle(style.toString());
+
         // Properties used by stateful-exchange protocol.
         String mySessionId = partnerRoleMex.getPartnerLink().getMySessionId();
         String partnerSessionId = partnerRoleMex.getPartnerLink().getPartnerSessionId();
@@ -840,6 +873,7 @@
         myRoleMex.setPattern(partnerRoleMex.getPattern());
         myRoleMex.setTimeout(partnerRoleMex.getTimeout());
         myRoleMex.setRequest(partnerRoleMex.getRequest());
+        myRoleMex.setInvocationStyle(style.toString());
 
         if (BpelProcess.__log.isDebugEnabled()) {
             __log.debug("Setting myRoleMex session ids for p2p interaction, mySession " + partnerSessionId + " - partnerSess "
@@ -857,7 +891,13 @@
     }
 
     void execute() {
+        if (!_contexts.isTransacted())
+            throw new BpelEngineException("MUST RUN IN TRANSACTION!");
+        if (_executed)
+            throw new IllegalStateException("cannot call execute() twice!");
+        
         long maxTime = System.currentTimeMillis() + _maxReductionTimeMs;
+        
 
         // Execute the process state reductions
         boolean canReduce = true;
@@ -889,12 +929,12 @@
                 // Max time exceeded (possibly an infinite loop).
                 if (__log.isDebugEnabled())
                     __log.debug("MaxTime exceeded for instance # " + _iid);
-                
-                // NOTE: we never ever schedule anything for in-mem processes, they have to finish in a single 
-                // go. 
+
+                // NOTE: we never ever schedule anything for in-mem processes, they have to finish in a single
+                // go.
                 if (_bpelProcess.isInMemory())
                     throw new BpelEngineException("In-memory process timeout.");
-                
+
                 try {
                     WorkEvent we = new WorkEvent();
                     we.setIID(_iid);
@@ -1074,61 +1114,43 @@
         }
     }
 
-    private void completeOutstandingMessageExchanges() {
+    /**
+     * Called when the process completes to clean up any outstanding message exchanges.
+     * 
+     */
+    private void cleanupOutstandingMyRoleExchanges(FaultData optionalFaultData) {
         String[] mexRefs = _outstandingRequests.releaseAll();
         for (String mexId : mexRefs) {
             MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
             if (mexDao != null) {
-                
-                
-                switch (mex.getStatus()) {
-                case ASYNC:
-                case RESPONSE:
-                    mex.setStatus(MessageExchange.Status.COMPLETED_OK);
-                    break;
-                case REQUEST:
-                    if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
-                        mex.setStatus(MessageExchange.Status.COMPLETED_OK);
-                        break;
-                    }
-                default:
-                    mex.setFailure(FailureType.OTHER, "No response.", null);
-                    _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex);
-                    mex.release();
+                Status status = MessageExchange.Status.valueOf(mexDao.getStatus());
+                MessageExchangePattern pattern = MessageExchange.MessageExchangePattern.valueOf(mexDao.getPattern());
+                InvocationStyle istyle = InvocationStyle.valueOf(mexDao.getInvocationStyle());
+                if (pattern == MessageExchangePattern.REQUEST_ONLY) {
+                    mexDao.setStatus(Status.COMPLETED_OK.toString());
+                    continue;
                 }
-            }
-        }
-    }
 
-    private void faultOutstandingMessageExchanges(FaultData faultData) {
-        String[] mexRefs = _outstandingRequests.releaseAll();
-        for (String mexId : mexRefs) {
-            MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
-            if (mexDao != null) {
-                ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao);
-                _bpelProcess.initMyRoleMex(mex);
+                mexDao.setFailureType(FailureType.NO_RESPONSE.toString());
+                if (optionalFaultData != null) {
+                    mexDao.setFaultExplanation(optionalFaultData.toString());
+                }
+                mexDao.setFaultExplanation("Process completed without responding.");
 
-                Message message = mex.createMessage(faultData.getFaultName());
-                if (faultData.getFaultMessage() != null)
-                    message.setMessage(faultData.getFaultMessage());
-                mex.setResponse(message);
+                switch (istyle) {
+                case RELIABLE:
+                    scheduleReliableResponse(mexDao);
+                    break;
+                case ASYNC:
+                    scheduleAsyncResponse(mexDao);
+                }
 
-                mex.setFault(faultData.getFaultName(), message);
-                mex.setFaultExplanation(faultData.getExplanation());
-                _contexts.mexContext.onAsyncReply(mex);
             }
         }
     }
 
-    private void failOutstandingMessageExchanges() {
-        String[] mexRefs = _outstandingRequests.releaseAll();
-        for (String mexId : mexRefs) {
-            MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
-            ReliableMyRoleMessageExchangeImpl mex = new ReliableMyRoleMessageExchangeImpl(_bpelProcess, mexDao);
-            _bpelProcess.initMyRoleMex(mex);
-            mex.setFailure(FailureType.OTHER, "No response.", null);
-            _contexts.mexContext.onAsyncReply(mex);
-        }
+    private void cleanupOutstandingMyRoleExchanges() {
+        cleanupOutstandingMyRoleExchanges(null);
     }
 
     public Element getPartnerResponse(String mexId) {
@@ -1361,34 +1383,28 @@
 
         }
     }
-    
-    
 
     /**
-     * Add a scheduled ASYNC response. 
+     * Add a scheduled ASYNC response.
      * 
      * @param messageExchangeId
      */
-    private void scheduleAsyncResponse(String messageExchangeId) {
+    private void scheduleAsyncResponse(MessageExchangeDAO mexdao) {
         assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
-        assert _contexts.scheduler.isTransacted(); 
-        
-        WorkEvent we = new WorkEvent();
-        we.setIID(_iid);
-        we.setMexId(messageExchangeId);
-        we.setProcessId(_bpelProcess.getPID());
-        we.setType(WorkEvent.Type.MYROLE_INVOKE_ASYNC_RESPONSE);
-        _contexts.scheduler.scheduleVolatileJob(false,null);
-        
+        assert _contexts.isTransacted();
+
+        final MyRoleMessageExchangeImpl mex = _bpelProcess.createMyRoleMex(mexdao);
+        _pendingMyRoleReplies.add(mex);
     }
 
-    private void scheduleReliableResponse(String messageExchangeId) {
+
+    private void scheduleReliableResponse(MessageExchangeDAO messageExchange) {
         assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
-        assert _contexts.scheduler.isTransacted(); 
-        
+        assert _contexts.isTransacted();
+
         WorkEvent we = new WorkEvent();
         we.setIID(_iid);
-        we.setMexId(messageExchangeId);
+        we.setMexId(messageExchange.getMessageExchangeId());
         we.setProcessId(_bpelProcess.getPID());
         we.setType(WorkEvent.Type.MYROLE_INVOKE_ASYNC_RESPONSE);
         _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
@@ -1403,6 +1419,31 @@
     private void continuePartnerReplied(MessageExchangeDAO pmex) {
 
     }
+    
+    class BlockingInvoker implements Runnable {
 
+        public BlockingInvoker(BlockingPartnerRoleMessageExchangeImpl blockingMex) {
+            // TODO Auto-generated constructor stub
+        }
 
+        public void run() {
+            // TODO Auto-generated method stub
+            
+        }
+        
+    }
+
+    
+    class AsyncInvoker implements Runnable {
+
+        public AsyncInvoker(AsyncPartnerRoleMessageExchangeImpl asyncMex) {
+            // TODO Auto-generated constructor stub
+        }
+
+        public void run() {
+            // TODO Auto-generated method stub
+            
+        }
+        
+    }
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Tue Jul 24 13:58:12 2007
@@ -27,6 +27,9 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -64,8 +67,6 @@
 import org.apache.ode.utils.stl.CollectionsX;
 import org.apache.ode.utils.stl.MemberOfFunction;
 
-import com.sun.corba.se.spi.activation._ActivatorImplBase;
-
 /**
  * <p>
  * The BPEL server implementation.
@@ -114,6 +115,8 @@
 
     private Properties _configProperties;
 
+    private ExecutorService _exec;
+
     BpelDatabase _db;
 
     /**
@@ -156,6 +159,10 @@
 
             __log.debug("BPEL SERVER starting.");
 
+
+            if (_exec == null)
+                _exec = Executors.newCachedThreadPool();
+
             _contexts.scheduler.start();
             _state = State.RUNNING;
             __log.info(__msgs.msgServerStarted());
@@ -226,7 +233,7 @@
 
             __log.debug("BPEL SERVER initializing ");
 
-            _db = new BpelDatabase(_contexts.dao, _contexts.scheduler);
+            _db = new BpelDatabase(_contexts);
             _state = State.INIT;
 
         } finally {
@@ -393,18 +400,26 @@
         }
     }
 
-    public void onScheduledJob(JobInfo jobInfo) throws JobProcessorException {
+    public void onScheduledJob(final JobInfo jobInfo) throws JobProcessorException {
         _mngmtLock.readLock().lock();
         try {
-            WorkEvent we = new WorkEvent(jobInfo.jobDetail);
+            final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
             BpelProcess process = _registeredProcesses.get(we.getProcessId());
             if (process == null) {
                 // If the process is not active, it means that we should not be
                 // doing any work on its behalf, therefore we will reschedule the
                 // events for some time in the future (1 minute).
-                Date future = new Date(System.currentTimeMillis() + (60 * 1000));
-                __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future));
-                _contexts.scheduler.schedulePersistedJob(we.getDetail(), future);
+                _contexts.execTransaction(new Callable<Void>() {
+
+                    public Void call() throws Exception {
+                        _contexts.scheduler.jobCompleted(jobInfo.jobName);
+                        Date future = new Date(System.currentTimeMillis() + (60 * 1000));
+                        __log.info(__msgs.msgReschedulingJobForInactiveProcess(we.getProcessId(), jobInfo.jobName, future));
+                        _contexts.scheduler.schedulePersistedJob(we.getDetail(), future);            
+                        return null;
+                    }
+                    
+                });
                 return;
             }
 
@@ -481,7 +496,7 @@
             switch (istyle) {
             case ASYNC:
                 try {
-                    mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get();
+                    mexId = _contexts.execTransaction(createDao);
                 } catch (Exception e) {
                     __log.error("Internal Error: could not execute isolated transaction.", e);
                     throw new BpelEngineException("Internal Error", e);
@@ -490,7 +505,7 @@
                 break;
             case BLOCKING:
                 try {
-                    mexId = _contexts.scheduler.execIsolatedTransaction(createDao).get();
+                    mexId = _contexts.execTransaction(createDao);
                 } catch (Exception e) {
                     __log.error("Internal Error: could not execute isolated transaction.", e);
                     throw new BpelEngineException("Internal Error", e);
@@ -576,7 +591,7 @@
                     return loadMex.call();
 
                 // TODO: should we not do this in the current thread if the mex is a transacted/reliable?
-                return _contexts.scheduler.execIsolatedTransaction(loadMex).get();
+                return execIsolatedTransaction(loadMex).get();
             } catch (ContextException e) {
                 throw new BpelEngineException(e);
             } catch (Exception e) {
@@ -621,6 +636,7 @@
         
         return null;
     }
+    
     void registerMessageExchangeStateListener(MessageExchangeStateListener mexStateListener) {
         WeakReference<MessageExchangeStateListener> ref = new WeakReference<MessageExchangeStateListener>(mexStateListener);
 
@@ -641,8 +657,27 @@
         }
     }
 
+
+    <T> Future<T> execIsolatedTransaction(final Callable<T> transaction) throws ContextException {
+        return _exec.submit(new Callable<T>() {
+            public T call() throws Exception {
+                
+                return _contexts.execTransaction(transaction);
+            }
+        });
+    }
+
+    /**
+     * Schedule a {@link Runnable} object for execution after the completion of the current transaction. 
+     * @param runnable
+     */
+    void scheduleRunnable(Runnable runnable) {
+        assertTransaction();
+        _contexts.registerCommitSynchronizer(runnable);
+    }
+    
     protected void assertTransaction() {
-        if (!_contexts.scheduler.isTransacted())
+        if (!_contexts.isTransacted())
             throw new BpelEngineException("Operation must be performed in a transaction!");
     }
 
@@ -724,4 +759,6 @@
             _mngmtLock.readLock().unlock();
         }
     }
+
+    
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java Tue Jul 24 13:58:12 2007
@@ -21,21 +21,30 @@
 
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.iapi.BindingContext;
+import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.BpelEventListener;
+import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.EndpointReferenceContext;
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
+
 /**
- * Aggregation of all the contexts provided to the BPEL engine by the
- * integration layer.
+ * Aggregation of all the contexts provided to the BPEL engine by the integration layer.
  */
 class Contexts {
 
+    TransactionManager txManager;
+
     MessageExchangeContext mexContext;
 
     Scheduler scheduler;
@@ -46,11 +55,84 @@
 
     BpelDAOConnectionFactory dao;
 
-    /** Global Message-Exchange interceptors. Must be copy-on-write!!! */ 
-    final List<MessageExchangeInterceptor >globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>();
+    /** Global Message-Exchange interceptors. Must be copy-on-write!!! */
+    final List<MessageExchangeInterceptor> globalIntereceptors = new CopyOnWriteArrayList<MessageExchangeInterceptor>();
 
     /** Global event listeners. Must be copy-on-write!!! */
     final List<BpelEventListener> eventListeners = new CopyOnWriteArrayList<BpelEventListener>();
 
+    public boolean isTransacted() {
+        try {
+            return txManager.getStatus() == Status.STATUS_ACTIVE;
+        } catch (SystemException e) {
+            throw new BpelEngineException(e);
+        }
+    }
+
+    public void execTransaction(final Runnable transaction) {
+        try {
+            execTransaction(new Callable<Void>() {
+
+                public Void call() throws Exception {
+                    transaction.run();
+                    return null;
+                }
+
+            });
+        } catch (Exception e) {
+            throw new BpelEngineException(e);
+        }
+
+    }
+
+    public <T> T execTransaction(Callable<T> transaction) throws Exception{
+        try {
+            txManager.begin();
+        } catch (Exception ex) {
+            String errmsg = "Internal Error, could not begin transaction.";
+            throw new BpelEngineException(errmsg, ex);
+        }
+        boolean success = false;
+        try {
+            T retval = transaction.call();
+            success = true;
+            return retval;
+        } catch (Exception ex) {
+            throw ex;
+        } finally {
+            if (success)
+                try {
+                    txManager.commit();
+                } catch (Exception ex) {
+                    throw new BpelEngineException("Could not commit.", ex);
+                }
+            else
+                try {
+                    txManager.rollback();
+                } catch (Exception ex) {
+                    throw new BpelEngineException("Could not rollback.", ex);
+
+                }
+        }
+    }
+
+    public void registerCommitSynchronizer(final Runnable runnable) {
+        try {
+            txManager.getTransaction().registerSynchronization(new Synchronization() {
+
+                public void afterCompletion(int status) {
+                    if (status == Status.STATUS_COMMITTED)
+                        runnable.run();
+                }
+
+                public void beforeCompletion() {
+
+                }
+                
+            });
+        } catch (Exception ex) {
+            throw new BpelEngineException("Error registering synchronizer." ,ex);
+        }
+    }
 
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java Tue Jul 24 13:58:12 2007
@@ -80,8 +80,7 @@
      */
     DebuggerSupport(BpelProcess process) {
         _process = process;
-        _db = new BpelProcessDatabase(_process._contexts.dao,
-                _process._contexts.scheduler,
+        _db = new BpelProcessDatabase(_process._contexts,
                 _process._pid);
 
     }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Tue Jul 24 13:58:12 2007
@@ -401,7 +401,7 @@
     }
 
     protected void assertTransaction() {
-        if (!_contexts.scheduler.isTransacted())
+        if (!_contexts.isTransacted())
             throw new BpelEngineException("Operation must be performed in a transaction!");
     }
 
@@ -411,7 +411,7 @@
             return action.call(getDAO());
         } else {
             try {
-                return _contexts.scheduler.execIsolatedTransaction(new Callable<T>() {
+                return _process._server.execIsolatedTransaction(new Callable<T>() {
                     public T call() throws Exception {
                         assertTransaction();
                         return action.call(getDAO());

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Tue Jul 24 13:58:12 2007
@@ -100,7 +100,7 @@
     protected void scheduleInvoke(BpelProcess target) {
         
         assert !_process.isInMemory() : "Cannot schedule invokes for in-memory processes.";
-        assert _contexts.scheduler.isTransacted() : "Cannot schedule outside of transaction context.";
+        assert _contexts.isTransacted() : "Cannot schedule outside of transaction context.";
         
         // Schedule a new job for invocation
         final WorkEvent we = new WorkEvent();

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Tue Jul 24 13:58:12 2007
@@ -32,10 +32,12 @@
 import org.apache.ode.bpel.evt.CorrelationNoMatchEvent;
 import org.apache.ode.bpel.evt.NewProcessInstanceEvent;
 import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.ProcessState;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.o.OMessageVarType;
 import org.apache.ode.bpel.o.OPartnerLink;
@@ -86,7 +88,7 @@
      * @param mex
      *            exchange to which the message is related
      */
-    public void invokeMyRole(MessageExchangeDAO mex) {
+    public CorrelationStatus invokeMyRole(MessageExchangeDAO mex) {
         if (__log.isTraceEnabled()) {
             __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] { "messageExchange", mex }));
         }
@@ -97,151 +99,134 @@
             mex.setStatus(Status.FAILURE.toString());
             mex.setFailureType(MessageExchange.FailureType.UNKNOWN_OPERATION.toString());
             mex.setFaultExplanation(mex.getOperation());
-            return;
+            return null;
         }
 
         // Is this a /possible/ createInstance Operation?
         boolean isCreateInstnace = _plinkDef.isCreateInstanceOperation(operation);
-
-        // now, the tricks begin: when a message arrives we have to see if there is anyone waiting for it. Get the correlator, a
-        // persisted communnication-reduction data structure supporting correlation correlationKey matching!
         String correlatorId = BpelProcess.genCorrelatorId(_plinkDef, operation.getName());
-
         CorrelatorDAO correlator = _process.getProcessDAO().getCorrelator(correlatorId);
 
-        CorrelationKey[] keys;
-        MessageRouteDAO messageRoute = null;
+        // Special logic for in-mem processes, only createInstance is allowed, so we can skip the
+        // correlation BS to save time.
+        if (_process.isInMemory()) {
+            if (isCreateInstnace)
+                invokeMyRoleCreateInstance(mex, operation, correlatorId, correlator);
+            else {
+                mex.setStatus(Status.FAILURE.toString());
+                mex.setFailureType(MessageExchange.FailureType.OTHER.toString());
+                mex.setFaultExplanation("Invalid in-memory process: non createInstance operations are not supported!");
+                return null;
+            }
 
-        // We need to compute the correlation keys (based on the operation
-        // we can infer which correlation keys to compute - this is merely a set
-        // consisting of each correlationKey used in each correlation sets
-        // that is ever referenced in an <receive>/<onMessage> on this
-        // partnerlink/operation.
-        try {
-            keys = computeCorrelationKeys(mex, operation);
-        } catch (InvalidMessageException ime) {
-            // We'd like to do a graceful exit here, no sense in rolling back due to a
-            // a message format problem.
-            __log.debug("Unable to evaluate correlation keys, invalid message format. ", ime);
-            mex.setFailureType(MessageExchange.FailureType.FORMAT_ERROR.toString());
-            mex.setStatus(Status.FAILURE.toString());
-            mex.setFaultExplanation(ime.getMessage());
+        } else {
 
-            return;
-        }
+            MessageRouteDAO messageRoute = null;
 
-        String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
-        String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
-        if (__log.isDebugEnabled()) {
-            __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys=" + ArrayUtils.makeCollection(HashSet.class, keys)
-                    + " mySessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
-        }
+            // now, the tricks begin: when a message arrives we have to see if there is anyone waiting for it. Get the correlator, a
+            // persisted communnication-reduction data structure supporting correlation correlationKey matching!
 
-        CorrelationKey matchedKey = null;
+            CorrelationKey[] keys;
 
-        // Try to find a route for one of our keys.
-        for (CorrelationKey key : keys) {
-            messageRoute = correlator.findRoute(key);
-            if (messageRoute != null) {
-                if (__log.isDebugEnabled()) {
-                    __log.debug("INPUTMSG: " + correlatorId + ": ckey " + key + " route is to " + messageRoute);
-                }
-                matchedKey = key;
-                break;
-            }
-        }
+            // We need to compute the correlation keys (based on the operation
+            // we can infer which correlation keys to compute - this is merely a set
+            // consisting of each correlationKey used in each correlation sets
+            // that is ever referenced in an <receive>/<onMessage> on this
+            // partnerlink/operation.
+            try {
+                keys = computeCorrelationKeys(mex, operation);
+            } catch (InvalidMessageException ime) {
+                // We'd like to do a graceful exit here, no sense in rolling back due to a
+                // a message format problem.
+                __log.debug("Unable to evaluate correlation keys, invalid message format. ", ime);
+                mex.setFailureType(MessageExchange.FailureType.FORMAT_ERROR.toString());
+                mex.setStatus(Status.FAILURE.toString());
+                mex.setFaultExplanation(ime.getMessage());
 
-        // TODO - ODE-58
+                return null;
+            }
 
-        // If no luck, and this operation qualifies for create-instance
-        // treatment, then create a new process
-        // instance.
-        if (messageRoute == null && isCreateInstnace) {
+            String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
+            String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
             if (__log.isDebugEnabled()) {
-                __log.debug("INPUTMSG: " + correlatorId + ": routing failed, CREATING NEW INSTANCE");
+                __log.debug("INPUTMSG: " + correlatorId + ": MSG RCVD keys=" + ArrayUtils.makeCollection(HashSet.class, keys)
+                        + " mySessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
             }
-            ProcessDAO processDAO = _process.getProcessDAO();
 
-            if (_process._pconf.getState() == ProcessState.RETIRED) {
-                throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
+            CorrelationKey matchedKey = null;
+
+            // Try to find a route for one of our keys.
+            for (CorrelationKey key : keys) {
+                messageRoute = correlator.findRoute(key);
+                if (messageRoute != null) {
+                    if (__log.isDebugEnabled()) {
+                        __log.debug("INPUTMSG: " + correlatorId + ": ckey " + key + " route is to " + messageRoute);
+                    }
+                    matchedKey = key;
+                    break;
+                }
             }
 
-            // if (!_process.processInterceptors(mex, InterceptorInvoker.__onNewInstanceInvoked)) {
-            // __log.debug("Not creating a new instance for mex " + mex + "; interceptor prevented!");
-            // return;
-            // }
-
-            ProcessInstanceDAO newInstance = processDAO.createInstance(correlator);
-
-            BpelRuntimeContextImpl instance = _process.createRuntimeContext(newInstance, new PROCESS(_process.getOProcess()), mex);
-
-            // send process instance event
-            NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_process.getOProcess().targetNamespace, _process
-                    .getOProcess().getName()), _process.getProcessDAO().getProcessId(), newInstance.getInstanceId());
-            evt.setPortType(mex.getPortType());
-            evt.setOperation(operation.getName());
-            evt.setMexId(mex.getMessageExchangeId());
-            _process._debugger.onEvent(evt);
-            _process.saveEvent(evt, newInstance);
-            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
-            mex.setInstance(newInstance);
+            // TODO - ODE-58
 
-            instance.execute();
-        } else if (messageRoute != null) {
-            if (__log.isDebugEnabled()) {
-                __log.debug("INPUTMSG: " + correlatorId + ": ROUTING to instance "
-                        + messageRoute.getTargetInstance().getInstanceId());
-            }
+            // If no luck, and this operation qualifies for create-instance
+            // treatment, then create a new process
+            // instance.
+            if (messageRoute == null && isCreateInstnace) {
+                invokeMyRoleCreateInstance(mex, operation, correlatorId, correlator);
+            } else if (messageRoute != null) {
+                if (__log.isDebugEnabled()) {
+                    __log.debug("INPUTMSG: " + correlatorId + ": ROUTING to instance "
+                            + messageRoute.getTargetInstance().getInstanceId());
+                }
 
-            ProcessInstanceDAO instanceDao = messageRoute.getTargetInstance();
+                ProcessInstanceDAO instanceDao = messageRoute.getTargetInstance();
 
-            // Reload process instance for DAO.
-            BpelRuntimeContextImpl instance = _process.createRuntimeContext(instanceDao, null, null);
-            instance.inputMsgMatch(messageRoute.getGroupId(), messageRoute.getIndex(), mex);
-
-            // Kill the route so some new message does not get routed to
-            // same process instance.
-            correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
-
-            // send process instance event
-            CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_process.getOProcess().targetNamespace, _process
-                    .getOProcess().getName()), _process.getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
-            evt.setPortType(mex.getPortType());
-            evt.setOperation(operation.getName());
-            evt.setMexId(mex.getMessageExchangeId());
-
-            _process._debugger.onEvent(evt);
-            // store event
-            _process.saveEvent(evt, instanceDao);
-
-            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED.toString());
-            mex.setInstance(messageRoute.getTargetInstance());
-            instance.execute();
-        } else {
-            if (__log.isDebugEnabled()) {
-                __log.debug("INPUTMSG: " + correlatorId + ": SAVING to DB (no match) ");
+                // Reload process instance for DAO.
+
+                // Kill the route so some new message does not get routed to
+                // same process instance.
+                correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
+
+                // send process instance event
+                CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_process.getOProcess().targetNamespace, _process
+                        .getOProcess().getName()), _process.getProcessDAO().getProcessId(), instanceDao.getInstanceId(), matchedKey);
+                evt.setPortType(mex.getPortType());
+                evt.setOperation(operation.getName());
+                evt.setMexId(mex.getMessageExchangeId());
+
+                _process._debugger.onEvent(evt);
+                // store event
+                _process.saveEvent(evt, instanceDao);
+
+                mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED.toString());
+                mex.setInstance(messageRoute.getTargetInstance());
+                
+                // We're overloading the channel here to be the PICK response channel +  index
+                mex.setChannel(messageRoute.getGroupId() + "&" + messageRoute.getIndex());
+            } else {
+                if (__log.isDebugEnabled()) {
+                    __log.debug("INPUTMSG: " + correlatorId + ": SAVING to DB (no match) ");
+                }
+
+                // TODO: Revist (BART)
+                // if (!mex.isAsynchronous()) {
+                // mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
+                //
+                // } else {
+                // send event
+                CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType(), mex.getOperation(), mex
+                        .getMessageExchangeId(), keys);
+
+                evt.setProcessId(_process.getProcessDAO().getProcessId());
+                evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
+                _process._debugger.onEvent(evt);
+
+                mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED.toString());
+                correlator.enqueueMessage(mex, keys);
             }
 
-            // TODO: Revist (BART)
-            // if (!mex.isAsynchronous()) {
-            // mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
-            //
-            // } else {
-            // send event
-            CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType(), mex.getOperation(), mex
-                    .getMessageExchangeId(), keys);
-
-            evt.setProcessId(_process.getProcessDAO().getProcessId());
-            evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
-            _process._debugger.onEvent(evt);
-
-            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED.toString());
-
-            // No match, means we add message exchange to the queue.
-            correlator.enqueueMessage(mex, keys);
-            // }
         }
-
         // Now we have to update our message exchange status. If the <reply>
         // was not hit during the
         // invocation, then we will be in the "REQUEST" phase which means
@@ -250,6 +235,39 @@
         if (Status.valueOf(mex.getStatus()) == MessageExchange.Status.REQUEST) {
             mex.setStatus(MessageExchange.Status.ASYNC.toString());
         }
+        
+        return CorrelationStatus.valueOf(mex.getCorrelationStatus());
+    }
+
+    private void invokeMyRoleCreateInstance(MessageExchangeDAO mex, Operation operation, String correlatorId,
+            CorrelatorDAO correlator) {
+        if (__log.isDebugEnabled()) {
+            __log.debug("INPUTMSG: " + correlatorId + ": routing failed, CREATING NEW INSTANCE");
+        }
+        ProcessDAO processDAO = _process.getProcessDAO();
+
+        if (_process._pconf.getState() == ProcessState.RETIRED) {
+            throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
+        }
+
+        // if (!_process.processInterceptors(mex, InterceptorInvoker.__onNewInstanceInvoked)) {
+        // __log.debug("Not creating a new instance for mex " + mex + "; interceptor prevented!");
+        // return;
+        // }
+
+        ProcessInstanceDAO newInstance = processDAO.createInstance(correlator);
+
+        // send process instance event
+        NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_process.getOProcess().targetNamespace, _process
+                .getOProcess().getName()), _process.getProcessDAO().getProcessId(), newInstance.getInstanceId());
+        evt.setPortType(mex.getPortType());
+        evt.setOperation(operation.getName());
+        evt.setMexId(mex.getMessageExchangeId());
+        _process._debugger.onEvent(evt);
+        _process.saveEvent(evt, newInstance);
+        mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE.toString());
+        mex.setInstance(newInstance);
+
     }
 
     @SuppressWarnings("unchecked")

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java Tue Jul 24 13:58:12 2007
@@ -39,15 +39,4 @@
         _initialPartner = initialPartner;
     }
 
-    public void processPartnerResponse(PartnerRoleMessageExchangeImpl messageExchange) {
-        if (__log.isDebugEnabled()) {
-            __log.debug("Processing partner's response for partnerLink: " + messageExchange);
-        }
-
-        BpelRuntimeContextImpl processInstance =
-                _process.createRuntimeContext(messageExchange.getDAO().getInstance(), null, null);
-        processInstance.invocationResponse(messageExchange);
-        processInstance.execute();
-    }
-
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java Tue Jul 24 13:58:12 2007
@@ -22,7 +22,7 @@
     protected void checkReplyContextOk() {
         super.checkReplyContextOk();
         
-        if (!_contexts.scheduler.isTransacted())
+        if (!_contexts.isTransacted())
             throw new BpelEngineException("Cannot replyXXX from non-transaction context!");
     }
 
@@ -40,7 +40,7 @@
     @Override
     protected void resumeInstance() {
         // TODO Auto-generated method stub
-        assert _contexts.scheduler.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
+        assert _contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
         assert !_process.isInMemory() : "resumeInstance() for reliable in-mem processes makes no sense.";
 
         final WorkEvent we = generateInvokeResponseWorkEvent();

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java Tue Jul 24 13:58:12 2007
@@ -23,15 +23,11 @@
     public Status invokeTransacted() throws BpelEngineException {
         assertTransaction();
        
-        boolean success = false;
-        try {
-            _process.invokeProcess(getDAO());
-            if (MessageExchange.Status.valueOf(getDAO().getStatus()) != Status.RESPONSE)
-                throw new BpelEngineException("Transactional invoke on process did not yield a response.");
-            success = true;
-        } finally {
-            
-        }
+        _process.invokeProcess(getDAO());
+        if (MessageExchange.Status.valueOf(getDAO().getStatus()) != Status.RESPONSE)
+            throw new BpelEngineException("Transactional invoke on process did not yield a response.");
+        return Status.valueOf(getDAO().getStatus());
+        
     }
 
     @Override

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java Tue Jul 24 13:58:12 2007
@@ -22,6 +22,7 @@
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.iapi.Scheduler;
 
+import javax.transaction.TransactionManager;
 import javax.xml.namespace.QName;
 import java.util.HashMap;
 import java.util.Map;
@@ -33,14 +34,14 @@
 public class BpelDAOConnectionFactoryImpl implements BpelDAOConnectionFactory {
     private static final Map<QName, ProcessDaoImpl> __StateStore = new HashMap<QName, ProcessDaoImpl>();
 
-    private Scheduler _scheduler;
+    private TransactionManager _txm;
 
-    public BpelDAOConnectionFactoryImpl(Scheduler sched) {
-        _scheduler = sched;
+    public BpelDAOConnectionFactoryImpl(TransactionManager txm) {
+        _txm = txm;
     }
 
     public BpelDAOConnection getConnection() {
-        return new BpelDAOConnectionImpl(__StateStore, _scheduler);
+        return new BpelDAOConnectionImpl(__StateStore, _txm);
     }
 
     /**

Modified: incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java Tue Jul 24 13:58:12 2007
@@ -35,6 +35,10 @@
 import org.apache.ode.utils.stl.CollectionsX;
 import org.apache.ode.utils.stl.UnaryFunction;
 
+import javax.transaction.RollbackException;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
 import javax.xml.namespace.QName;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -47,22 +51,26 @@
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
-
 /**
  * A very simple, in-memory implementation of the {@link BpelDAOConnection} interface.
  */
 class BpelDAOConnectionImpl implements BpelDAOConnection {
     private static final Log __log = LogFactory.getLog(BpelDAOConnectionImpl.class);
 
-    private Scheduler _scheduler;
+    private TransactionManager _txm;
+
     private Map<QName, ProcessDaoImpl> _store;
+
     private List<BpelEvent> _events = new LinkedList<BpelEvent>();
-    private static Map<String,MessageExchangeDAO> _mexStore = Collections.synchronizedMap(new HashMap<String,MessageExchangeDAO>());
+
+    private static Map<String, MessageExchangeDAO> _mexStore = Collections
+            .synchronizedMap(new HashMap<String, MessageExchangeDAO>());
+
     private static AtomicLong counter = new AtomicLong(Long.MAX_VALUE / 2);
 
-    BpelDAOConnectionImpl(Map<QName, ProcessDaoImpl> store, Scheduler scheduler) {
+    BpelDAOConnectionImpl(Map<QName, ProcessDaoImpl> store, TransactionManager txm) {
         _store = store;
-        _scheduler = scheduler;
+        _txm = txm;
     }
 
     public ProcessDAO getProcess(QName processId) {
@@ -70,8 +78,8 @@
     }
 
     public ProcessDAO createProcess(QName pid, QName type, String guid, long version) {
-        ProcessDaoImpl process = new ProcessDaoImpl(this,_store,pid,type, guid,version);
-        _store.put(pid,process);
+        ProcessDaoImpl process = new ProcessDaoImpl(this, _store, pid, type, guid, version);
+        _store.put(pid, process);
         return process;
     }
 
@@ -85,13 +93,12 @@
     }
 
     public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter filter) {
-        if(filter.getLimit()==0) {
+        if (filter.getLimit() == 0) {
             return Collections.EMPTY_LIST;
         }
         List<ProcessInstanceDAO> matched = new ArrayList<ProcessInstanceDAO>();
         // Selecting
-        selectionCompleted:
-        for (ProcessDaoImpl proc : _store.values()) {
+        selectionCompleted: for (ProcessDaoImpl proc : _store.values()) {
             boolean pmatch = true;
             if (filter.getNameFilter() != null
                     && !equalsOrWildcardMatch(filter.getNameFilter(), proc.getProcessId().getLocalPart()))
@@ -107,9 +114,11 @@
                     if (filter.getStatusFilter() != null) {
                         boolean statusMatch = false;
                         for (Short status : filter.convertFilterState()) {
-                            if (inst.getState() == status.byteValue()) statusMatch = true;
+                            if (inst.getState() == status.byteValue())
+                                statusMatch = true;
                         }
-                        if (!statusMatch) match = false;
+                        if (!statusMatch)
+                            match = false;
                     }
                     if (filter.getStartedDateFilter() != null
                             && !dateMatch(filter.getStartedDateFilter(), inst.getCreateTime(), filter))
@@ -118,25 +127,25 @@
                             && !dateMatch(filter.getLastActiveDateFilter(), inst.getLastActiveTime(), filter))
                         match = false;
 
-//                    if (filter.getPropertyValuesFilter() != null) {
-//                        for (Map.Entry propEntry : filter.getPropertyValuesFilter().entrySet()) {
-//                            boolean entryMatched = false;
-//                            for (ProcessPropertyDAO prop : proc.getProperties()) {
-//                                if (prop.getName().equals(propEntry.getKey())
-//                                        && (propEntry.getValue().equals(prop.getMixedContent())
-//                                        || propEntry.getValue().equals(prop.getSimpleContent()))) {
-//                                    entryMatched = true;
-//                                }
-//                            }
-//                            if (!entryMatched) {
-//                                match = false;
-//                            }
-//                        }
-//                    }
+                    // if (filter.getPropertyValuesFilter() != null) {
+                    // for (Map.Entry propEntry : filter.getPropertyValuesFilter().entrySet()) {
+                    // boolean entryMatched = false;
+                    // for (ProcessPropertyDAO prop : proc.getProperties()) {
+                    // if (prop.getName().equals(propEntry.getKey())
+                    // && (propEntry.getValue().equals(prop.getMixedContent())
+                    // || propEntry.getValue().equals(prop.getSimpleContent()))) {
+                    // entryMatched = true;
+                    // }
+                    // }
+                    // if (!entryMatched) {
+                    // match = false;
+                    // }
+                    // }
+                    // }
 
                     if (match) {
                         matched.add(inst);
-                        if(matched.size()==filter.getLimit()) {
+                        if (matched.size() == filter.getLimit()) {
                             break selectionCompleted;
                         }
                     }
@@ -149,9 +158,10 @@
 
             Collections.sort(matched, new Comparator<ProcessInstanceDAO>() {
                 public int compare(ProcessInstanceDAO o1, ProcessInstanceDAO o2) {
-                    for (String orderKey: orders) {
+                    for (String orderKey : orders) {
                         int result = compareInstanceUsingKey(orderKey, o1, o2);
-                        if (result != 0) return result;
+                        if (result != 0)
+                            return result;
                     }
                     return 0;
                 }
@@ -173,8 +183,8 @@
 
     public MessageExchangeDAO createMessageExchange(char dir) {
         String id = Long.toString(counter.getAndIncrement());
-        MessageExchangeDAO mex = new MessageExchangeDAOImpl(dir,id);
-        _mexStore.put(id,mex);
+        MessageExchangeDAO mex = new MessageExchangeDAOImpl(dir, id);
+        _mexStore.put(id, mex);
         return mex;
     }
 
@@ -189,7 +199,8 @@
         String orderKey = key;
         if (key.startsWith("+") || key.startsWith("-")) {
             orderKey = key.substring(1, key.length());
-            if (key.startsWith("-")) ascending = false;
+            if (key.startsWith("-"))
+                ascending = false;
         }
         ProcessDAO process1 = getProcess(instanceDAO1.getProcess().getProcessId());
         ProcessDAO process2 = getProcess(instanceDAO2.getProcess().getProcessId());
@@ -203,11 +214,11 @@
             s1 = process1.getProcessId().getNamespaceURI();
             s2 = process2.getProcessId().getNamespaceURI();
         } else if ("version".equals(orderKey)) {
-            s1 = ""+process1.getVersion();
-            s2 = ""+process2.getVersion();
+            s1 = "" + process1.getVersion();
+            s2 = "" + process2.getVersion();
         } else if ("status".equals(orderKey)) {
-            s1 = ""+instanceDAO1.getState();
-            s2 = ""+instanceDAO2.getState();
+            s1 = "" + instanceDAO1.getState();
+            s2 = "" + instanceDAO2.getState();
         } else if ("started".equals(orderKey)) {
             s1 = ISO8601DateParser.format(instanceDAO1.getCreateTime());
             s2 = ISO8601DateParser.format(instanceDAO2.getCreateTime());
@@ -215,62 +226,71 @@
             s1 = ISO8601DateParser.format(instanceDAO1.getLastActiveTime());
             s2 = ISO8601DateParser.format(instanceDAO2.getLastActiveTime());
         }
-        if (ascending) return s1.compareTo(s2);
-        else return s2.compareTo(s1);
+        if (ascending)
+            return s1.compareTo(s2);
+        else
+            return s2.compareTo(s1);
     }
 
     private boolean equalsOrWildcardMatch(String s1, String s2) {
-        if (s1 == null || s2 == null) return false;
-        if (s1.equals(s2)) return true;
+        if (s1 == null || s2 == null)
+            return false;
+        if (s1.equals(s2))
+            return true;
         if (s1.endsWith("*")) {
-            if (s2.startsWith(s1.substring(0, s1.length() - 1))) return true;
+            if (s2.startsWith(s1.substring(0, s1.length() - 1)))
+                return true;
         }
         if (s2.endsWith("*")) {
-            if (s1.startsWith(s2.substring(0, s2.length() - 1))) return true;
+            if (s1.startsWith(s2.substring(0, s2.length() - 1)))
+                return true;
         }
         return false;
     }
 
-    public boolean dateMatch(List<String> dateFilters, Date instanceDate,  InstanceFilter filter) {
+    public boolean dateMatch(List<String> dateFilters, Date instanceDate, InstanceFilter filter) {
         boolean match = true;
         for (String ddf : dateFilters) {
             String isoDate = ISO8601DateParser.format(instanceDate);
             String critDate = Filter.getDateWithoutOp(ddf);
             if (ddf.startsWith("=")) {
-                if (!isoDate.startsWith(critDate)) match = false;
+                if (!isoDate.startsWith(critDate))
+                    match = false;
             } else if (ddf.startsWith("<=")) {
-                if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) > 0) match = false;
+                if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) > 0)
+                    match = false;
             } else if (ddf.startsWith(">=")) {
-                if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) < 0) match = false;
+                if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) < 0)
+                    match = false;
             } else if (ddf.startsWith("<")) {
-                if (isoDate.compareTo(critDate) > 0) match = false;
+                if (isoDate.compareTo(critDate) > 0)
+                    match = false;
             } else if (ddf.startsWith(">")) {
-                if (isoDate.compareTo(critDate) < 0) match = false;
+                if (isoDate.compareTo(critDate) < 0)
+                    match = false;
             }
         }
         return match;
     }
 
-
     public ScopeDAO getScope(Long siidl) {
         for (ProcessDaoImpl process : _store.values()) {
             for (ProcessInstanceDAO instance : process._instances.values()) {
-                if (instance.getScope(siidl) != null) return instance.getScope(siidl);
+                if (instance.getScope(siidl) != null)
+                    return instance.getScope(siidl);
             }
         }
         return null;
     }
 
-
     public void insertBpelEvent(BpelEvent event, ProcessDAO processConfiguration, ProcessInstanceDAO instance) {
         _events.add(event);
     }
 
-
     public List<Date> bpelEventTimelineQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
         // TODO : Provide more correct implementation:
         ArrayList<Date> dates = new ArrayList<Date>();
-        CollectionsX.transform(dates, _events, new UnaryFunction<BpelEvent,Date>() {
+        CollectionsX.transform(dates, _events, new UnaryFunction<BpelEvent, Date>() {
             public Date apply(BpelEvent x) {
                 return x.getTimestamp();
             }
@@ -278,7 +298,6 @@
         return dates;
     }
 
-
     public List<BpelEvent> bpelEventQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
         // TODO : Provide a more correct (filtering) implementation:
         return _events;
@@ -288,7 +307,7 @@
      * @see org.apache.ode.bpel.dao.BpelDAOConnection#instanceQuery(String)
      */
     public Collection<ProcessInstanceDAO> instanceQuery(String expression) {
-        //TODO
+        // TODO
         throw new UnsupportedOperationException();
     }
 
@@ -301,12 +320,18 @@
     }
 
     public void defer(final Runnable runnable) {
-        _scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
-            public void afterCompletion(boolean success) {
-            }
-            public void beforeCompletion() {
-                runnable.run();
-            }
-        });
+        try {
+            _txm.getTransaction().registerSynchronization(new Synchronization() {
+                public void afterCompletion(int status) {
+                }
+
+                public void beforeCompletion() {
+                    runnable.run();
+                }
+            });
+       
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 }

Modified: incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=559204&r1=559203&r2=559204
==============================================================================
--- incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ incubator/ode/branches/bart/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Tue Jul 24 13:58:12 2007
@@ -18,21 +18,35 @@
  */
 package org.apache.ode.bpel.runtime;
 
+import java.io.File;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC;
 import org.apache.ode.bpel.engine.BpelServerImpl;
 import org.apache.ode.bpel.iapi.BindingContext;
+import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.EndpointReferenceContext;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
 import org.apache.ode.dao.jpa.BPELDAOConnectionFactoryImpl;
 import org.apache.ode.il.EmbeddedGeronimoFactory;
 import org.apache.ode.il.MockScheduler;
@@ -44,21 +58,6 @@
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-import java.io.File;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
 
 class MockBpelServer {
 
@@ -72,8 +71,8 @@
     EndpointReferenceContext  _eprContext;
     MessageExchangeContext    _mexContext;
     BindingContext            _bindContext;
-    HashMap<String, QName>    _activated = new HashMap();
-    HashMap                   _endpoints = new HashMap();
+    HashMap<String, QName>    _activated = new HashMap<String,QName>();
+    HashMap<String, EndpointReference> _endpoints = new HashMap<String, EndpointReference>();
 
     public MockBpelServer() {
         try {
@@ -85,7 +84,6 @@
             if (_daoCF == null)
                 throw new RuntimeException("No DAO");
             _server.setDaoConnectionFactory(_daoCF);
-            _server.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler));
             if (_scheduler == null)
                 throw new RuntimeException("No scheduler");
             _store = new ProcessStoreImpl(_dataSource,"jpa", true);
@@ -115,7 +113,7 @@
             MyRoleMessageExchange mex;
 
             _txManager.begin();
-            mex = _server.getEngine().createMessageExchange("" + messageId, serviceName, opName);
+            mex = _server.createMessageExchange(InvocationStyle.ASYNC,serviceName, opName, "" + messageId);
             if (mex.getOperation() == null)
                 throw new Exception("Did not find operation " + opName + " on service " + serviceName);
             Message request = mex.createMessage(mex.getOperation().getInput().getMessage().getQName());
@@ -124,7 +122,8 @@
             Element message = body.getOwnerDocument().createElementNS("", "message");
             message.appendChild(wrapper);
             request.setMessage(message);
-            mex.invoke(request);
+            mex.setRequest(request);
+            mex.invokeAsync();
             mex.complete();
             _txManager.commit();
         } catch (Exception except) {
@@ -216,6 +215,34 @@
        _mexContext =  new MessageExchangeContext() {
             public void invokePartner(PartnerRoleMessageExchange mex) { }
             public void onAsyncReply(MyRoleMessageExchange myRoleMex) { }
+            public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
+                // TODO Auto-generated method stub
+                
+            }
+            public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc, EndpointReference partnerEpr) {
+                // TODO Auto-generated method stub
+                return null;
+            }
+            public void invokePartnerAsynch(PartnerRoleMessageExchange mex) throws ContextException {
+                // TODO Auto-generated method stub
+                
+            }
+            public void invokePartnerBlocking(PartnerRoleMessageExchange mex) throws ContextException {
+                // TODO Auto-generated method stub
+                
+            }
+            public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
+                // TODO Auto-generated method stub
+                
+            }
+            public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
+                // TODO Auto-generated method stub
+                
+            }
+            public void onReliableReply(MyRoleMessageExchange myRoleMex) throws BpelEngineException {
+                // TODO Auto-generated method stub
+                
+            }
         };
         return _mexContext;
     }
@@ -267,9 +294,7 @@
         long                _nextSchedule;
 
         SchedulerWrapper(BpelServerImpl server, TransactionManager txManager, DataSource dataSource) {
-            ExecutorService executorService = Executors.newCachedThreadPool();
             _scheduler = new MockScheduler(_txManager);
-            _scheduler.setExecutorSvc(executorService);
             _scheduler.setJobProcessor(server);
         }
 
@@ -279,39 +304,24 @@
             return jobId;
         }
 
-        public String scheduleVolatileJob(boolean transacted, Map<String,Object> jobDetail) throws ContextException {
-            String jobId = _scheduler.scheduleVolatileJob(transacted, jobDetail);
-            _nextSchedule = System.currentTimeMillis();
-            return jobId;
-        }
-
         public void cancelJob(String jobId) throws ContextException {
             _scheduler.cancelJob(jobId);
         }
 
-        public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
-            return _scheduler.execTransaction(transaction);
-        }
-
-        public <T> Future<T> execIsolatedTransaction(Callable<T> transaction) throws Exception, ContextException {
-            return _scheduler.execIsolatedTransaction(transaction);
-        }
-
-        public boolean isTransacted() {
-            return _scheduler.isTransacted();
-        }
 
         public void start() { _scheduler.start(); }
         public void stop() { _scheduler.stop(); }
         public void shutdown() { _scheduler.shutdown(); }
 
-        public void registerSynchronizer(Synchronizer synch) throws ContextException {
-            _scheduler.registerSynchronizer(synch);
-        }
 
         public void setJobProcessor(JobProcessor processor) throws ContextException {
             _scheduler.setJobProcessor(processor);
 
+        }
+
+        public void jobCompleted(String jobId) {
+            _scheduler.jobCompleted(jobId);
+            
         }
     }