You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/06 22:47:59 UTC

svn commit: r563267 [1/2] - in /ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel: engine/ memdao/

Author: mszefler
Date: Mon Aug  6 13:47:58 2007
New Revision: 563267

URL: http://svn.apache.org/viewvc?view=rev&rev=563267
Log:
BART, some additinal refactorings. New model to fix concurrency problems in Partner invokes.

Added:
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java   (with props)
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java   (with props)
Modified:
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Mon Aug  6 13:47:58 2007
@@ -54,6 +54,7 @@
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
@@ -152,7 +153,7 @@
         // TODO : do this on a per-partnerlink basis, support transacted styles.
         HashSet<InvocationStyle> istyles = new HashSet<InvocationStyle>();
         istyles.add(InvocationStyle.UNRELIABLE);
-        
+
         if (!conf.isTransient()) {
             istyles.add(InvocationStyle.RELIABLE);
         } else {
@@ -185,18 +186,40 @@
 
         _hydrationLatch.latch(1);
         try {
+            // The following check is mostly for sanity purposes. MexImpls should prevent this from 
+            // happening. 
             PartnerLinkMyRoleImpl target = getMyRoleForService(mexdao.getCallee());
+            Status oldstatus = Status.valueOf(mexdao.getStatus());
             if (target == null) {
                 String errmsg = __msgs.msgMyRoleRoutingFailure(mexdao.getMessageExchangeId());
                 __log.error(errmsg);
                 mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_ENDPOINT.toString());
                 mexdao.setFaultExplanation(errmsg);
-                Status oldstatus = Status.valueOf(mexdao.getStatus());
-                mexdao.setStatus(Status.FAILURE.toString());
-                fireMexStateEvent(mexdao, oldstatus, Status.FAILURE);
+                mexdao.setStatus(Status.ACK.toString());
+                mexdao.setAckType(AckType.FAILURE);
+                fireMexStateEvent(mexdao, oldstatus, Status.ACK);
+                return;
+            }
+
+            Operation op = target._plinkDef.getMyRoleOperation(mexdao.getOperation());
+            if (op == null) {
+                String errmsg = __msgs.msgMyRoleRoutingFailure(mexdao.getMessageExchangeId());
+                __log.error(errmsg);
+                mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_OPERATION.toString());
+                mexdao.setFaultExplanation(errmsg);
+                mexdao.setStatus(Status.ACK.toString());
+                mexdao.setAckType(AckType.FAILURE);
+                fireMexStateEvent(mexdao, oldstatus, Status.ACK);
                 return;
             }
 
+            // "Acknowledge" any one-way invokes 
+            if (op.getOutput() == null) {
+                mexdao.setStatus(Status.ACK.toString());
+                mexdao.setAckType(AckType.ONEWAY);
+                fireMexStateEvent(mexdao, oldstatus, Status.ACK);
+            }
+            
             mexdao.setProcess(getProcessDAO());
 
             // TODO: fix this
@@ -220,7 +243,7 @@
             } else if (cstatus == CorrelationStatus.MATCHED) {
                 doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
                     public Void call() {
-                        executeContinueInstance(mexdao);
+                        executeContinueInstanceMyRoleRequestReceived(mexdao);
                         return null;
                     }
                 });
@@ -233,13 +256,9 @@
             _hydrationLatch.release(1);
         }
 
-        // TODO: relocate this code // For a one way, once the engine is done, the mex can be safely released.
-        // if (mex.getMessageExchangePattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
-        // mex.release();
-        // }
     }
 
-    private void executeCreateInstance(MessageExchangeDAO mexdao) {
+    void executeCreateInstance(MessageExchangeDAO mexdao) {
         assert _hydrationLatch.isLatched(1);
 
         BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
@@ -249,11 +268,12 @@
         instanceCtx.execute();
     }
 
-    private void executeContinueInstance(MessageExchangeDAO mexdao) {
+    void executeContinueInstanceMyRoleRequestReceived(MessageExchangeDAO mexdao) {
         assert _hydrationLatch.isLatched(1);
 
         BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
         assert worker.isWorkerThread();
+
         BpelRuntimeContextImpl instance = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null);
         int amp = mexdao.getChannel().indexOf('&');
         String groupId = mexdao.getChannel().substring(0, amp);
@@ -262,6 +282,20 @@
         instance.execute();
     }
 
+    void executeContinueInstancePartnerRoleResponseReceived(MessageExchangeDAO mexdao) {
+        assert _hydrationLatch.isLatched(1);
+        BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
+        assert worker.isWorkerThread();
+
+//      TODO: we need a way to check if the lastBRC is indeed the lastBRC (serial number on the instanceDAO)
+//        BpelRuntimeContextImpl brc = lastBRC == null ? new BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null)
+//        : new BpelRuntimeContextImpl(worker, mexdao.getInstance(), lastBRC);
+        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null);
+
+        brc.injectPartnerResponse(mexdao.getMessageExchangeId(), mexdao.getChannel());
+        brc.execute();
+    }
+
     private void enqueueInstanceWork(Long instanceId, Runnable runnable) {
         BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
         iworker.enqueue(runnable);
@@ -278,7 +312,7 @@
      * @param instanceId
      * @param name
      */
-    private void scheduleInstanceWork(final Long instanceId, final Runnable runnable) {
+    void scheduleInstanceWork(final Long instanceId, final Runnable runnable) {
         _contexts.registerCommitSynchronizer(new Runnable() {
             public void run() {
                 BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
@@ -438,40 +472,40 @@
         BpelInstanceWorker worker = _instanceWorkerCache.get(we.getIID());
         assert worker.isWorkerThread();
 
-        ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getIID());
-        if (procInstance == null) {
+        ProcessInstanceDAO instanceDAO = getProcessDAO().getInstance(we.getIID());
+        if (instanceDAO == null) {
             if (__log.isDebugEnabled()) {
                 __log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring.");
             }
             return;
         }
 
-        BpelRuntimeContextImpl processInstance = new BpelRuntimeContextImpl(worker, procInstance, null, null);
+        BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDAO, null, null);
         switch (we.getType()) {
         case TIMER:
             if (__log.isDebugEnabled()) {
-                __log.debug("handleWorkEvent: TimerWork event for process instance " + processInstance);
+                __log.debug("handleWorkEvent: TimerWork event for process instance " + brc);
             }
-            processInstance.timerEvent(we.getChannel());
+            brc.timerEvent(we.getChannel());
             break;
         case RESUME:
             if (__log.isDebugEnabled()) {
                 __log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID());
             }
-            processInstance.execute();
+            brc.execute();
             break;
         case PARTNER_RESPONSE:
             if (__log.isDebugEnabled()) {
                 __log.debug("InvokeResponse event for iid " + we.getIID());
             }
-            processInstance.injectPartnerResponse(we.getMexId(), we.getChannel());
-            processInstance.execute();
+            brc.injectPartnerResponse(we.getMexId(), we.getChannel());
+            brc.execute();
             break;
         case MATCHER:
             if (__log.isDebugEnabled()) {
                 __log.debug("Matcher event for iid " + we.getIID());
             }
-            processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
+            brc.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
         }
     }
 
@@ -756,36 +790,12 @@
     }
 
     PartnerRoleMessageExchangeImpl createPartnerRoleMex(MessageExchangeDAO mexdao) {
-        InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
-        PartnerRoleMessageExchangeImpl mex;
+
         _hydrationLatch.latch(1);
         try {
             OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
-            Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
-            switch (istyle) {
-            case UNRELIABLE:
-                mex = new UnreliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
-                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
-                break;
-            case TRANSACTED:
-                mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /*
-                                                                                                                             * EPR
-                                                                                                                             * todo
-                                                                                                                             */
-                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
-                break;
-            case RELIABLE:
-                mex = new ReliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
-                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
-                break;
-
-            default:
-                throw new BpelEngineException("Unexpected InvocationStyle: " + istyle);
-
-            }
-
-            mex.load(mexdao);
-            return mex;
+            PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(plink);
+            return prole.createPartnerRoleMex(mexdao);
         } finally {
             _hydrationLatch.release(1);
         }
@@ -887,6 +897,124 @@
         }
     }
 
+    MessageExchangeDAO createMessageExchange(String mexId, final char dir) {
+        if (isInMemory()) {
+            return _inMemDao.getConnection().createMessageExchange(mexId, dir);
+        } else {
+            return _contexts.dao.getConnection().createMessageExchange(mexId, dir);
+        }
+    }
+
+    MessageExchangeDAO getInMemMexDAO(String mexId) {
+        return _inMemDao.getConnection().getMessageExchange(mexId);
+    }
+
+    /**
+     * Schedule process-level work. This method defers to the server to do the scheduling and wraps the {@link Runnable} in a
+     * try-finally block that ensures that the process is hydrated.
+     * 
+     * @param runnable
+     */
+    void scheduleRunnable(final Runnable runnable) {
+        if (__log.isDebugEnabled())
+            __log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
+
+        _server.scheduleRunnable(new ProcessRunnable(runnable));
+    }
+
+    void enqueueRunnable(BpelInstanceWorker worker) {
+        if (__log.isDebugEnabled())
+            __log.debug("enqueuRunnable for process " + _pid + ": " + worker);
+
+        _server.enqueueRunnable(new ProcessRunnable(worker));
+    }
+
+    MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle istyle, final QName targetService, final String operation,
+            final String clientKey) {
+
+        final String mexId = new GUID().toString();
+        _hydrationLatch.latch(1);
+        try {
+
+            final PartnerLinkMyRoleImpl target = getPartnerLinkForService(targetService);
+            if (target == null)
+                throw new BpelEngineException("NoSuchService: " + targetService);
+            final Operation op = target._plinkDef.getMyRoleOperation(operation);
+            if (op == null)
+                throw new BpelEngineException("NoSuchOperation: " + operation);
+
+            return newMyRoleMex(istyle, mexId, target._endpoint.serviceName, target._plinkDef, op);
+
+        } finally {
+            _hydrationLatch.release(1);
+        }
+    }
+
+    void registerMyRoleMex(MyRoleMessageExchangeImpl mymex) {
+        _mexStateListeners.add(new WeakReference<MyRoleMessageExchangeImpl>(mymex));
+    }
+
+    void unregisterMyRoleMex(MyRoleMessageExchangeImpl mymex) {
+        ArrayList<WeakReference<MyRoleMessageExchangeImpl>> needsRemoval = new ArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
+        for (WeakReference<MyRoleMessageExchangeImpl> wref : _mexStateListeners) {
+            MyRoleMessageExchangeImpl mex = wref.get();
+            if (mex == null || mex == mymex)
+                needsRemoval.add(wref);
+        }
+        _mexStateListeners.removeAll(needsRemoval);
+
+    }
+
+    void fireMexStateEvent(MessageExchangeDAO mexdao, Status old, Status news) {
+        // TODO: force a myrole mex to be created if it is not in cache.
+
+        if (old != news)
+            for (WeakReference<MyRoleMessageExchangeImpl> wr : _mexStateListeners) {
+                MyRoleMessageExchangeImpl mymex = wr.get();
+                if (mymex != null && mymex.getMessageExchangeId() != null)
+                    mymex.onStateChanged(mexdao, old, news);
+            }
+
+    }
+
+    class ProcessRunnable implements Runnable {
+        Runnable _work;
+
+        ProcessRunnable(Runnable work) {
+            _work = work;
+        }
+
+        public void run() {
+            _hydrationLatch.latch(1);
+            try {
+                _work.run();
+            } finally {
+                _hydrationLatch.release(1);
+            }
+
+        }
+
+    }
+
+    class ProcessCallable<T> implements Callable<T> {
+        Callable<T> _work;
+
+        ProcessCallable(Callable<T> work) {
+            _work = work;
+        }
+
+        public T call() throws Exception {
+            _hydrationLatch.latch(1);
+            try {
+                return _work.call();
+            } finally {
+                _hydrationLatch.release(1);
+            }
+
+        }
+
+    }
+
     class HydrationLatch extends NStateLatch {
 
         HydrationLatch() {
@@ -994,121 +1122,21 @@
 
     }
 
-    MessageExchangeDAO createMessageExchange(String mexId, final char dir) {
-        if (isInMemory()) {
-            return _inMemDao.getConnection().createMessageExchange(mexId, dir);
-        } else {
-            return _contexts.dao.getConnection().createMessageExchange(mexId, dir);
-        }
-    }
-
-    MessageExchangeDAO getInMemMexDAO(String mexId) {
-        return _inMemDao.getConnection().getMessageExchange(mexId);
-    }
-
     /**
-     * Schedule process-level work. This method defers to the server to do the scheduling and wraps the {@link Runnable} in a
-     * try-finally block that ensures that the process is hydrated.
+     * Invoke a partner via the integration layer.
      * 
-     * @param runnable
+     * @param mexDao
+     * @param brc
      */
-    void scheduleRunnable(final Runnable runnable) {
-        if (__log.isDebugEnabled())
-            __log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
-
-        _server.scheduleRunnable(new ProcessRunnable(runnable));
-    }
-
-    public void enqueueRunnable(BpelInstanceWorker worker) {
-        if (__log.isDebugEnabled())
-            __log.debug("enqueuRunnable for process " + _pid + ": " + worker);
-
-        _server.enqueueRunnable(new ProcessRunnable(worker));
-    }
-
-    class ProcessRunnable implements Runnable {
-        Runnable _work;
-
-        ProcessRunnable(Runnable work) {
-            _work = work;
-        }
-
-        public void run() {
-            _hydrationLatch.latch(1);
-            try {
-                _work.run();
-            } finally {
-                _hydrationLatch.release(1);
-            }
-
-        }
-
-    }
-
-    class ProcessCallable<T> implements Callable<T> {
-        Callable<T> _work;
-
-        ProcessCallable(Callable<T> work) {
-            _work = work;
-        }
-
-        public T call() throws Exception {
-            _hydrationLatch.latch(1);
-            try {
-                return _work.call();
-            } finally {
-                _hydrationLatch.release(1);
-            }
-
-        }
-
-    }
-
-    public MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle istyle, final QName targetService,
-            final String operation, final String clientKey) {
-
-        final String mexId = new GUID().toString();
+    void invokeIL(MessageExchangeDAO mexDao) {
         _hydrationLatch.latch(1);
         try {
-
-            final PartnerLinkMyRoleImpl target = getPartnerLinkForService(targetService);
-            if (target == null)
-                throw new BpelEngineException("NoSuchService: " + targetService);
-            final Operation op = target._plinkDef.getMyRoleOperation(operation);
-            if (op == null)
-                throw new BpelEngineException("NoSuchOperation: " + operation);
-
-            return newMyRoleMex(istyle, mexId, target._endpoint.serviceName, target._plinkDef, op);
-
+            OPartnerLink oplink = (OPartnerLink) _oprocess.getChild(mexDao.getPartnerLinkModelId());
+            PartnerLinkPartnerRoleImpl partnerRole = _partnerRoles.get(oplink);
+            partnerRole.invokeIL(mexDao);
         } finally {
             _hydrationLatch.release(1);
         }
     }
 
-    void registerMyRoleMex(MyRoleMessageExchangeImpl mymex) {
-        _mexStateListeners.add(new WeakReference<MyRoleMessageExchangeImpl>(mymex));
-    }
-
-    void unregisterMyRoleMex(MyRoleMessageExchangeImpl mymex) {
-        ArrayList<WeakReference<MyRoleMessageExchangeImpl>> needsRemoval = new ArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
-        for (WeakReference<MyRoleMessageExchangeImpl> wref : _mexStateListeners) {
-            MyRoleMessageExchangeImpl mex = wref.get();
-            if (mex == null || mex == mymex)
-                needsRemoval.add(wref);
-        }
-        _mexStateListeners.removeAll(needsRemoval);
-
-    }
-
-    void fireMexStateEvent(MessageExchangeDAO mexdao, Status old, Status news) {
-        // TODO: force a myrole mex to be created if it is not in cache.
-        
-        if (old != news)
-            for (WeakReference<MyRoleMessageExchangeImpl> wr : _mexStateListeners) {
-                MyRoleMessageExchangeImpl mymex = wr.get();
-                if (mymex != null && mymex.getMessageExchangeId() != null)
-                    mymex.onStateChanged(mexdao, old, news);
-            }
-
-    }
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Mon Aug  6 13:47:58 2007
@@ -25,13 +25,8 @@
 import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
 
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
 import javax.wsdl.Operation;
-import javax.wsdl.PortType;
 import javax.xml.namespace.QName;
 
 import org.apache.commons.logging.Log;
@@ -62,6 +57,7 @@
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
 import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
@@ -92,7 +88,6 @@
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.Namespaces;
 import org.apache.ode.utils.ObjectPrinter;
-import org.omg.CORBA._PolicyStub;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
@@ -119,9 +114,6 @@
 
     private MessageExchangeDAO _instantiatingMessageExchange;
 
-    /** Object for keeping track of all the outstanding <pick>/<receive> activities */
-    private OutstandingRequestManager _outstandingRequests;
-
     /** List of pending invocations that need to be deferred until the end of the current TX */
     private List<PartnerRoleMessageExchangeImpl> _pendingPartnerRoleInvokes = new LinkedList<PartnerRoleMessageExchangeImpl>();
 
@@ -139,22 +131,26 @@
 
     private boolean _executed;
 
-    public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
+    /**
+     * Construct a BRC using the soup from the previous BRC. This is handy as it allows us to eliminate the DB read of the soup,
+     * when we know the soup has not changed since the last TX.
+     * 
+     * @param instanceWorker
+     * @param instanceDao
+     * @param lastBRC
+     */
+    BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO instanceDao, BpelRuntimeContextImpl lastBRC) {
+        this(instanceWorker, instanceDao, lastBRC._soup);
+    }
+
+    BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
             MessageExchangeDAO instantiatingMessageExchange) {
-        _instanceWorker = instanceWorker;
-        _bpelProcess = instanceWorker._process;
-        _contexts = instanceWorker._contexts;
-        _dao = dao;
-        _iid = dao.getInstanceId();
+
+        this(instanceWorker, dao, new ExecutionQueueImpl(null));
         _instantiatingMessageExchange = instantiatingMessageExchange;
-        _vpu = new JacobVPU();
-        _vpu.registerExtension(BpelRuntimeContext.class, this);
 
-        _soup = new ExecutionQueueImpl(null);
         _soup.setReplacementMap(_bpelProcess.getReplacementMap());
-        _outstandingRequests = new OutstandingRequestManager();
-        _vpu.setContext(_soup);
-
+        _soup.setGlobalData(new OutstandingRequestManager());
         byte[] daoState = _bpelProcess.isInMemory() ? null : dao.getExecutionState();
         if (daoState != null) {
             assert !_bpelProcess.isInMemory() : "did not expect to rehydrate in-mem process!";
@@ -164,13 +160,23 @@
             } catch (Exception ex) {
                 throw new RuntimeException(ex);
             }
-            _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
         }
-
         if (PROCESS != null) {
             _vpu.inject(PROCESS);
         }
 
+    }
+
+    BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, ExecutionQueueImpl soup) {
+        _instanceWorker = instanceWorker;
+        _bpelProcess = instanceWorker._process;
+        _contexts = instanceWorker._contexts;
+        _dao = dao;
+        _iid = dao.getInstanceId();
+        _vpu = new JacobVPU();
+        _vpu.registerExtension(BpelRuntimeContext.class, this);
+        _soup = soup;
+        _vpu.setContext(_soup);
         if (BpelProcess.__log.isDebugEnabled()) {
             __log.debug("BpelRuntimeContextImpl created for instance " + _iid + ". INDEXED STATE=" + _soup.getIndex());
         }
@@ -330,11 +336,11 @@
             correlators.add(processDao.getCorrelator(correlatorId));
         }
 
-        int conflict = _outstandingRequests.findConflict(selectors);
+        int conflict = getORM().findConflict(selectors);
         if (conflict != -1)
             throw new FaultException(_bpelProcess.getOProcess().constants.qnConflictingReceive, selectors[conflict].toString());
 
-        _outstandingRequests.register(pickResponseChannelStr, selectors);
+        getORM().register(pickResponseChannelStr, selectors);
 
         // TODO - ODE-58
 
@@ -508,7 +514,7 @@
 
     public void reply(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg, QName fault)
             throws FaultException {
-        String mexRef = _outstandingRequests.release(plinkInstnace, opName, mexId);
+        String mexRef = getORM().release(plinkInstnace, opName, mexId);
 
         if (mexRef == null) {
             throw new FaultException(_bpelProcess.getOProcess().constants.qnMissingRequest);
@@ -530,21 +536,20 @@
 
         myrolemex.setResponse(message);
 
-        Status status;
-
+        AckType ackType;
         if (fault != null) {
-            status = Status.FAULT;
+            ackType = AckType.FAULT;
             myrolemex.setFault(fault);
             evt.setAspect(ProcessMessageExchangeEvent.PROCESS_FAULT);
         } else {
-            status = Status.RESPONSE;
+            ackType = AckType.RESPONSE;
             evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
         }
 
         Status previousStatus = Status.valueOf(myrolemex.getStatus());
-        myrolemex.setStatus(status.toString());
-        
-        doMyRoleResponse(myrolemex, previousStatus, status);
+        myrolemex.setStatus(Status.ACK.toString());
+        myrolemex.setAckType(ackType);
+        doMyRoleResponse(myrolemex, previousStatus, Status.ACK);
 
         sendEvent(evt);
     }
@@ -559,7 +564,8 @@
     }
 
     /**
-     * Handle P2P responses. 
+     * Handle P2P responses.
+     * 
      * @param myrolemex
      */
     private void p2pResponse(MessageExchangeDAO myrolemex) {
@@ -683,6 +689,8 @@
     public String invoke(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
             InvokeResponseChannel channel) throws FaultException {
 
+        // TODO: move a lot of this into BpelProcess
+
         // Get the Integration Layer's communication channel for the partnerlink.
         PartnerRoleChannel partnerRoleChannel = _bpelProcess.getPartnerRoleChannel(partnerLink.partnerLink);
 
@@ -741,27 +749,10 @@
 
         if (p2pProcess != null) {
             /* P2P (process-to-process) invocation, special logic */
-            invokeP2P(p2pProcess, partnerEndpoint.serviceName, operation, outgoingMessage, mexDao);
+            invokeP2P(p2pProcess, partnerEndpoint.serviceName, operation, mexDao);
         } else {
             /* NOT p2p, need to call out to IL */
-            invokeIL(partnerLink, operation, outgoingMessage, partnerRoleChannel, partnerEpr, mexDao);
-        }
-
-        // In case a response/fault was available right away, which will happen for BLOCKING/TRANSACTED invocations,
-        // we need to inject a message on the response channel, so that the process continues.
-        switch (Status.valueOf(mexDao.getStatus())) {
-        case ASYNC:
-            break;
-        case RESPONSE:
-        case FAULT:
-        case FAILURE:
-            injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
-            break;
-        default:
-            __log.error("Partner did not acknowledge message exchange: " + mexDao.getMessageExchangeId());
-            mexDao.setStatus(Status.FAILURE.toString());
-            mexDao.setFailureType(FailureType.NO_RESPONSE.toString());
-            injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
+            invokeIL(partnerLink, operation, partnerRoleChannel, partnerEpr, mexDao);
         }
 
         return mexDao.getMessageExchangeId();
@@ -778,107 +769,39 @@
      * @param partnerEpr
      * @param mexDao
      */
-    private void invokeIL(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
-            PartnerRoleChannel partnerRoleChannel, EndpointReference partnerEpr, MessageExchangeDAO mexDao) {
+    private void invokeIL(PartnerLinkInstance partnerLink, Operation operation, PartnerRoleChannel partnerRoleChannel,
+            EndpointReference partnerEpr, MessageExchangeDAO mexDao) {
+
         // If we couldn't find the endpoint, then there is no sense
         // in asking the IL to invoke.
         if (partnerEpr == null) {
             __log.error("Couldn't find endpoint for partner EPR ");
             mexDao.setFailureType(FailureType.UNKNOWN_ENDPOINT.toString());
             mexDao.setFaultExplanation("UnknownEndpoint");
-            mexDao.setStatus(Status.FAILURE.toString());
+            mexDao.setStatus(Status.ACK.toString());
+            mexDao.setAckType(AckType.FAILURE);
             return;
         }
 
-        EndpointReference myRoleEpr = null; // TODO: how did we get this?
-
         mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
-        mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
-        Set<InvocationStyle> supportedStyles = _contexts.mexContext.getSupportedInvocationStyle(partnerRoleChannel, partnerEpr);
-
-        boolean oneway = MessageExchangePattern.valueOf(mexDao.getPattern()) == MessageExchangePattern.REQUEST_ONLY;
+        mexDao.setStatus(MessageExchange.Status.REQ.toString());
 
-        if (_bpelProcess.isInMemory()) {
-            // In-memory processes are a bit different, we're never going to do any scheduling for them, so we'd
-            // prefer to have TRANSACTED invocation style.
-            if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
-                // If TRANSACTED is supported, this is again easy, do it in-line.
-                TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
-                        mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
-                        partnerRoleChannel);
-                _contexts.mexContext.invokePartnerTransacted(transactedMex);
-            } else if (supportedStyles.contains(InvocationStyle.RELIABLE) && oneway) {
-                // We can do RELIABLE for in-mem, but only if they are one way.
-                ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess,
-                        mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
-                        partnerRoleChannel);
-                _contexts.mexContext.invokePartnerReliable(reliableMex);
-
-            } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
-                // Need to cheat a little bit for in-memory processes; do the invoke in-line, but first suspend
-                // the transaction so that the IL does not get confused.
-                Transaction tx;
-                try {
-                    tx = _contexts.txManager.suspend();
-                } catch (Exception ex) {
-                    throw new BpelEngineException("TxManager Error: cannot suspend!", ex);
-                }
-                try {
-                    UnreliablePartnerRoleMessageExchangeImpl unreliableMex = new UnreliablePartnerRoleMessageExchangeImpl(_bpelProcess,
-                            mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
-                            partnerRoleChannel);
-                    _contexts.mexContext.invokePartnerBlocking(unreliableMex);
-                    unreliableMex.waitForResponse();
-                } finally {
-                    try {
-                        _contexts.txManager.resume(tx);
-                    } catch (Exception e) {
-                        throw new BpelEngineException("TxManager Error: cannot resume!", e);
-                    }
-                }
-            }
-        } else {
-            if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+        _bpelProcess.invokeIL(mexDao);
 
-                // If TRANSACTED is supported, this is again easy, do it in-line. Also, this what we always do for
-                // in-mem processes (even if the IL claims to not support it.)
-                TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
-                        mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
-                        partnerRoleChannel);
-                _contexts.mexContext.invokePartnerTransacted(transactedMex);
-            } else if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
-                // If RELIABLE is supported, this is easy, we just do it in-line.
-                ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess,
-                        mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
-                        partnerRoleChannel);
-                _contexts.mexContext.invokePartnerReliable(reliableMex);
-            } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
-                // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
-                UnreliablePartnerRoleMessageExchangeImpl blockingMex = new UnreliablePartnerRoleMessageExchangeImpl(_bpelProcess,
-                        mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
-                        partnerRoleChannel);
-                // We schedule in-memory (no db) to guarantee "at most once" semantics.
-                schedule(new UnreliableInvoker(blockingMex));
-                // TODO: how do we recover the invocation if system dies in BlockingInvoker?
-            } else {
-                // This really should not happen, indicates IL is screwy.
-                __log.error("Integration Layer did not agree to any known invocation style for EPR " + partnerEpr);
-                mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
-                mexDao.setStatus(Status.FAILURE.toString());
-                mexDao.setFaultExplanation("NoMatchingStyle");
-            }
+        // In case a response/fault was available right away, which will happen for BLOCKING/TRANSACTED invocations,
+        // we need to inject a message on the response channel, so that the process continues.
+        switch (Status.valueOf(mexDao.getStatus())) {
+        case REQ:
+            break;
+        case ACK:
+            injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
+            break;
+        default:
+            throw new AssertionError("Unexpected MEX status: " + mexDao.getStatus());
         }
 
     }
 
-    private void schedule(final Runnable runnable) {
-        _contexts.registerCommitSynchronizer(new Runnable() {
-            public void run() {
-                _instanceWorker.enqueue(runnable);
-            }
-        });
-    }
-
     /**
      * Invoke a partner process directly (via the engine), bypassing the Integration Layer. Obviously this can only be used when an
      * process is partners with another process hosted on the same engine.
@@ -887,8 +810,7 @@
      * @param outgoingMessage
      * @param partnerRoleMex
      */
-    private void invokeP2P(BpelProcess target, QName serviceName, Operation operation, Element outgoingMessage,
-            MessageExchangeDAO partnerRoleMex) {
+    private void invokeP2P(BpelProcess target, QName serviceName, Operation operation, MessageExchangeDAO partnerRoleMex) {
         if (BpelProcess.__log.isDebugEnabled()) {
             __log.debug("Invoking in a p2p interaction, partnerrole " + partnerRoleMex.getMessageExchangeId());
         }
@@ -950,9 +872,8 @@
         if (mySessionId != null)
             myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId);
 
-        partnerRoleMex.setStatus(MessageExchange.Status.ASYNC.toString());
         target.invokeProcess(myRoleMex);
-        // TODO: perhaps we should check if the other process finished ,or will it always
+        // TODO: perhaps we should check if the other process finished ,or will it always?
     }
 
     void execute() {
@@ -973,7 +894,7 @@
         if (!ProcessState.isFinished(_dao.getState())) {
             if (__log.isDebugEnabled())
                 __log.debug("Setting execution state on instance " + _iid);
-            _soup.setGlobalData(_outstandingRequests);
+            _soup.setGlobalData(getORM());
 
             if (_bpelProcess.isInMemory()) {
                 // don't serialize in-memory processes
@@ -1029,8 +950,9 @@
             evt.setNewState(ProcessState.STATE_ACTIVE);
             sendEvent(evt);
         }
+        
 
-        _outstandingRequests.associate(responsechannel, mexdao.getMessageExchangeId());
+        getORM().associate(responsechannel, mexdao.getMessageExchangeId());
 
         final String mexId = mexdao.getMessageExchangeId();
         _vpu.inject(new JacobRunnable() {
@@ -1047,7 +969,7 @@
         // In case this is a pick event, we remove routes,
         // and cancel the outstanding requests.
         _dao.getProcess().removeRoutes(timerResponseChannel, _dao);
-        _outstandingRequests.cancel(timerResponseChannel);
+        getORM().cancel(timerResponseChannel);
 
         // Ignore timer events after the process is finished.
         if (ProcessState.isFinished(_dao.getState())) {
@@ -1070,7 +992,7 @@
         // receive/reply association.
         final String id = timerResponseChannel.export();
         _dao.getProcess().removeRoutes(id, _dao);
-        _outstandingRequests.cancel(id);
+        getORM().cancel(id);
 
         _vpu.inject(new JacobRunnable() {
             private static final long serialVersionUID = 6157913683737696396L;
@@ -1118,7 +1040,7 @@
 
         MessageExchange.Status status = MessageExchange.Status.valueOf(mex.getStatus());
 
-        switch (status) {
+        switch (mex.getAckType()) {
         case FAULT:
             evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAULT);
             responseChannel.onFault();
@@ -1184,7 +1106,7 @@
      * 
      */
     private void cleanupOutstandingMyRoleExchanges(FaultData optionalFaultData) {
-        String[] mexRefs = _outstandingRequests.releaseAll();
+        String[] mexRefs = getORM().releaseAll();
         for (String mexId : mexRefs) {
             MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
             if (mexDao != null) {
@@ -1192,20 +1114,26 @@
                 MessageExchangePattern pattern = MessageExchange.MessageExchangePattern.valueOf(mexDao.getPattern());
                 InvocationStyle istyle = InvocationStyle.valueOf(mexDao.getInvocationStyle());
                 if (pattern == MessageExchangePattern.REQUEST_ONLY) {
-                    mexDao.setStatus(Status.COMPLETED_OK.toString());
+                    mexDao.setAckType(AckType.ONEWAY);
+                    mexDao.setStatus(Status.COMPLETED.toString());
                     continue;
                 }
 
+                mexDao.setAckType(AckType.FAILURE);
                 mexDao.setFailureType(FailureType.NO_RESPONSE.toString());
                 if (optionalFaultData != null) {
                     mexDao.setFaultExplanation(optionalFaultData.toString());
                 }
                 mexDao.setFaultExplanation("Process completed without responding.");
-                doMyRoleResponse(mexDao, status, Status.FAILURE);
+                doMyRoleResponse(mexDao, status, Status.ACK);
             }
         }
     }
 
+    private OutstandingRequestManager getORM() {
+        return (OutstandingRequestManager) _soup.getGlobalData();
+    }
+
     private void cleanupOutstandingMyRoleExchanges() {
         cleanupOutstandingMyRoleExchanges(null);
     }
@@ -1230,27 +1158,16 @@
             throw new BpelEngineException(msg);
         }
 
-        MessageExchange.Status status = MessageExchange.Status.valueOf(dao.getStatus());
-        switch (status) {
-        case ASYNC:
-        case REQUEST:
-            MessageDAO request = dao.getRequest();
-            if (request == null) {
-                // this also should not happen
-                String msg = "Engine requested request for message exchange that did not have one: " + mexId;
-                __log.fatal(msg);
-                throw new BpelEngineException(msg);
-            }
-
-            return request.getData();
-
-        default:
-            // We should not be in any other state when requesting this.
-            String msg = "Engine requested response while the message exchange " + mexId + " was in the state " + status;
+        MessageDAO request = dao.getRequest();
+        if (request == null) {
+            // this also should not happen
+            String msg = "Engine requested request for message exchange that did not have one: " + mexId;
             __log.fatal(msg);
             throw new BpelEngineException(msg);
         }
 
+        return request.getData();
+
     }
 
     public QName getPartnerFault(String mexId) {
@@ -1284,9 +1201,7 @@
 
         MessageDAO response;
         MessageExchange.Status status = MessageExchange.Status.valueOf(dao.getStatus());
-        switch (status) {
-        case FAULT:
-        case RESPONSE:
+        if (status == Status.ACK) {
             response = dao.getResponse();
             if (response == null) {
                 // this also should not happen
@@ -1294,8 +1209,7 @@
                 __log.fatal(msg);
                 throw new BpelEngineException(msg);
             }
-            break;
-        default:
+        } else {
             // We should not be in any other state when requesting this.
             String msg = "Engine requested response while the message exchange " + mexId + " was in the state " + status;
             __log.fatal(msg);
@@ -1441,7 +1355,6 @@
         }
     }
 
-
     private void scheduleReliableResponse(MessageExchangeDAO messageExchange) {
         assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
         assert _contexts.isTransacted();
@@ -1464,51 +1377,4 @@
 
     }
 
-    /**
-     * Runnable that actually performs UNRELIABLE invokes on the partner.
-     * 
-     * @author Maciej Szefler <mszefler at gmail dot com>
-     * 
-     */
-    class UnreliableInvoker implements Runnable {
-
-        UnreliablePartnerRoleMessageExchangeImpl _blockingMex;
-
-        public UnreliableInvoker(UnreliablePartnerRoleMessageExchangeImpl blockingMex) {
-            _blockingMex = blockingMex;
-        }
-
-        public void run() {
-            assert !_contexts.isTransacted();
-
-            // TODO: what happens if system fails right here? we'll need to add a "retry" possibility
-
-            Runnable prc;
-            try {
-                _contexts.mexContext.invokePartnerBlocking(_blockingMex);
-                prc = new PartnerResponseContinuation(_blockingMex);
-            } catch (Exception ce) {
-                prc = new PartnerResponseContinuation(_blockingMex);
-            }
-
-            // Keep using the same thread to do the work, but note we need to run this in a transaction.
-            _instanceWorker.enqueue(_bpelProcess._server.new TransactedRunnable(prc));
-        }
-
-    }
-
-
-    class PartnerResponseContinuation implements Runnable {
-
-        private UnreliablePartnerRoleMessageExchangeImpl _mex;
-
-        public PartnerResponseContinuation(UnreliablePartnerRoleMessageExchangeImpl blockingMex) {
-            _mex = blockingMex;
-        }
-
-        public void run() {
-
-        }
-
-    }
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Mon Aug  6 13:47:58 2007
@@ -37,6 +37,7 @@
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
 import org.apache.ode.bpel.o.OPartnerLink;
 import org.apache.ode.utils.msg.MessageBundle;
 import org.w3c.dom.Element;
@@ -129,14 +130,16 @@
 
     private Set<String> _propNames;
 
+    private AckType _ackType;
+
 
 
     public MessageExchangeImpl(
             BpelProcess process, 
-            String mexId,
+            Long iid,
+            String mexId, 
             OPartnerLink oplink, 
-            PortType ptype, 
-            Operation operation) {
+            PortType ptype, Operation operation) {
         _process = process;
         _contexts = process._contexts;
         _mexId = mexId;
@@ -150,11 +153,14 @@
         return _mexId.equals(((MessageExchangeImpl)other)._mexId);
     }
 
+    Long getIID() {
+        return _iid;
+    }
     
     void load(MessageExchangeDAO dao) {
         _timeout = dao.getTimeout();
         _iid = dao.getInstance() != null ? dao.getInstance().getInstanceId() : null;
-        
+        _ackType = dao.getAckType();
         if (_fault == null)
             _fault = dao.getFault();
         if (_explanation == null)
@@ -163,7 +169,7 @@
             _status = Status.valueOf(dao.getStatus());
     }
 
-    public void save(MessageExchangeDAO dao) {
+    void save(MessageExchangeDAO dao) {
         dao.setPartnerLinkModelId(_oplink.getId());
         dao.setOperation(_operation.getName());
         dao.setStatus(_status.toString());
@@ -172,7 +178,7 @@
         dao.setFaultExplanation(_explanation);
         dao.setTimeout(_timeout);
         dao.setFailureType(_failureType == null ? null : _failureType.toString());
-        
+        dao.setAckType(_ackType);
 
         if (_changes.contains(Change.REQUEST)) {
             MessageDAO requestDao = dao.createMessage(_request.getType());
@@ -260,6 +266,10 @@
         return _status;
     }
 
+    public AckType getAckType() {
+        return _ackType;
+    }
+    
     public Operation getOperation() {
         return _operation;
     }
@@ -299,9 +309,13 @@
     }
 
     
-   
-    void setStatus(Status status) {
-        _status = status;
+    void request() {
+        _status = Status.REQ;
+    }
+    
+    void ack(AckType ackType) {
+        _status = Status.ACK;
+        _ackType = ackType;
     }
 
     public Message createMessage(javax.xml.namespace.QName msgType) {

Added: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java?view=auto&rev=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java (added)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java Mon Aug  6 13:47:58 2007
@@ -0,0 +1,46 @@
+package org.apache.ode.bpel.engine;
+
+import javax.xml.namespace.QName;
+
+import org.apache.ode.bpel.dao.MessageDAO;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.w3c.dom.Element;
+
+/**
+ * Some handy utilities methods for dealing with MEX daos.
+ *  
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+class MexDaoUtil {
+
+    static void setFailed(MessageExchangeDAO mex, FailureType ftype, String explanation) {
+        mex.setStatus(Status.ACK.toString());
+        mex.setAckType(AckType.FAILURE);
+        mex.setFailureType(ftype.toString());
+        mex.setFaultExplanation(explanation);
+    }
+
+    static void setFaulted(MessageExchangeDAO mex, QName faultType, Element faultmsg) {
+        mex.setStatus(Status.ACK.toString());
+        mex.setAckType(AckType.FAULT);
+        mex.setFailureType(null);
+        mex.setFault(faultType);
+        MessageDAO flt = mex.createMessage(faultType);
+        flt.setData(faultmsg);
+        mex.setResponse(flt);
+    }
+
+    static void setResponse(MessageExchangeDAO mex, Element response) {
+        mex.setStatus(Status.ACK.toString());
+        mex.setAckType(AckType.RESPONSE);
+        mex.setFailureType(null);
+        mex.setFault(null);
+        MessageDAO resp = mex.createMessage(null);
+        resp.setData(response);
+        mex.setResponse(resp);
+    }
+}

Propchange: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java?view=auto&rev=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java (added)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java Mon Aug  6 13:47:58 2007
@@ -0,0 +1,70 @@
+package org.apache.ode.bpel.engine;
+
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * Manage {@link MyRoleMessageExchangeImpl} object references. 
+ * 
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+class MyRoleMessageExchangeCache {
+    
+    private static final int CLEANUP_PERIOD = 20;
+
+    private HashMap<String, WeakReference<MyRoleMessageExchangeImpl>> _cache;
+
+    private int _inserts = 0;
+    
+    void put(MyRoleMessageExchangeImpl mex) {
+        synchronized (this) {
+            ++_inserts;
+            if (_inserts > CLEANUP_PERIOD) {
+                cleanup();
+            }
+                
+            WeakReference<MyRoleMessageExchangeImpl> ref = _cache.get(mex.getMessageExchangeId());
+            if (ref != null && ref.get() != null)
+                throw new IllegalStateException("InternalError: duplicate myrolemex registration!");
+            
+            _cache.put(mex.getMessageExchangeId(), new WeakReference<MyRoleMessageExchangeImpl>(mex));
+        }
+    }
+    
+    /**
+     * Attempt to retrieve a {@link MyRoleMessageExchangeImpl} for the given identifier.
+     * @param mexId
+     * @return
+     */
+    MyRoleMessageExchangeImpl get(String mexId) {
+        synchronized(this) {
+            WeakReference<MyRoleMessageExchangeImpl> ref = _cache.get(mexId);
+            if (ref == null)
+                return null;
+            MyRoleMessageExchangeImpl mex = ref.get();
+            if (mex == null)
+                _cache.remove(mexId);
+            return mex;
+        
+        }
+
+    }
+
+    /**
+     * Remove stale references.
+     *
+     */
+    void cleanup() {
+        synchronized(this){
+            for (Iterator<WeakReference<MyRoleMessageExchangeImpl>> i = _cache.values().iterator(); i.hasNext(); ) {
+                WeakReference<MyRoleMessageExchangeImpl> ref = i.next();
+                if (ref.get() == null)
+                    i.remove();
+            }
+            
+            _inserts = 0;
+        }
+    }
+}

Propchange: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Mon Aug  6 13:47:58 2007
@@ -33,14 +33,14 @@
     protected String _clientId;
 
     public MyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
-        super(process, mexId, oplink, oplink.myRolePortType, operation);
+        super(process, null, mexId, oplink, oplink.myRolePortType, operation);
         _callee = callee;
     }
 
     public CorrelationStatus getCorrelationStatus() {
         return _cstatus;
     }
-
+  
     @Override
     void load(MessageExchangeDAO dao) {
         super.load(dao);
@@ -116,7 +116,6 @@
         we1.setProcessId(_process.getPID());
         we1.setMexId(_mexId);
 
-        setStatus(Status.ASYNC);
         _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
         _contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
 
@@ -160,40 +159,41 @@
 
     protected void onStateChanged(MessageExchangeDAO mexdao, Status oldstatus, final Status newstatus) {
         MessageDAO response = mexdao.getResponse();
-        switch (newstatus) {
-        case RESPONSE: {
-            final Element msg = response.getData();
-            final QName msgtype = response.getType();
-            _process.scheduleRunnable(new Runnable() {
-                public void run() {
-                    serverResponded(new MemBackedMessageImpl(msg, msgtype, true));
-                }
-            });
-        }
-            break;
-        case FAULT: {
-            final QName fault = mexdao.getFault();
-            final Element faultMsg = response.getData();
-            final QName msgtype = response.getType();
-            _process.scheduleRunnable(new Runnable() {
-                public void run() {
-                    serverFaulted(fault, new MemBackedMessageImpl(faultMsg, msgtype, true));
-                }
-
-            });
-        }
-            break;
-        case FAILURE:
-            final String failureExplanation = mexdao.getFaultExplanation();
-            final FailureType ftype = FailureType.valueOf(mexdao.getFailureType());
-            _process.scheduleRunnable(new Runnable() {
-                public void run() {
-                    serverFailed(ftype, failureExplanation, null); // TODO add failure detail
-                }
-
-            });
-            break;
-        }
+        if (newstatus == Status.ACK)
+            switch (mexdao.getAckType()) {
+            case RESPONSE: {
+                final Element msg = response.getData();
+                final QName msgtype = response.getType();
+                _process.scheduleRunnable(new Runnable() {
+                    public void run() {
+                        serverResponded(new MemBackedMessageImpl(msg, msgtype, true));
+                    }
+                });
+            }
+                break;
+            case FAULT: {
+                final QName fault = mexdao.getFault();
+                final Element faultMsg = response.getData();
+                final QName msgtype = response.getType();
+                _process.scheduleRunnable(new Runnable() {
+                    public void run() {
+                        serverFaulted(fault, new MemBackedMessageImpl(faultMsg, msgtype, true));
+                    }
+    
+                });
+            }
+                break;
+            case FAILURE:
+                final String failureExplanation = mexdao.getFaultExplanation();
+                final FailureType ftype = FailureType.valueOf(mexdao.getFailureType());
+                _process.scheduleRunnable(new Runnable() {
+                    public void run() {
+                        serverFailed(ftype, failureExplanation, null); // TODO add failure detail
+                    }
+    
+                });
+                break;
+            }
     }
 
     protected void finalize() {
@@ -204,7 +204,7 @@
     void serverFaulted(QName faultType, Message outputFaultMessage) throws BpelEngineException {
         _fault = faultType;
         _response = (MessageImpl) outputFaultMessage;
-        setStatus(Status.FAULT);
+        ack(AckType.FAULT);
     }
 
    
@@ -213,15 +213,14 @@
         _explanation = null;
         _response = (MessageImpl) outputMessage;
         _response.makeReadOnly();
-        setStatus(Status.RESPONSE);
+        ack(AckType.RESPONSE);
 
     }
 
     void serverFailed(FailureType type, String reason, Element details) {
         _failureType = type;
         _explanation = reason;
-        setStatus(Status.FAILURE);
-
+        ack(AckType.FAILURE);
     }
 
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Mon Aug  6 13:47:58 2007
@@ -18,6 +18,15 @@
  */
 package org.apache.ode.bpel.engine;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.common.CorrelationKey;
@@ -32,28 +41,21 @@
 import org.apache.ode.bpel.evt.CorrelationNoMatchEvent;
 import org.apache.ode.bpel.evt.NewProcessInstanceEvent;
 import org.apache.ode.bpel.iapi.Endpoint;
-import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.ProcessState;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
-import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.o.OMessageVarType;
 import org.apache.ode.bpel.o.OPartnerLink;
 import org.apache.ode.bpel.o.OProcess;
 import org.apache.ode.bpel.o.OScope;
 import org.apache.ode.bpel.runtime.InvalidProcessException;
-import org.apache.ode.bpel.runtime.PROCESS;
 import org.apache.ode.utils.ArrayUtils;
 import org.apache.ode.utils.ObjectPrinter;
 import org.apache.ode.utils.msg.MessageBundle;
 import org.w3c.dom.Element;
 
-import javax.wsdl.Operation;
-import javax.xml.namespace.QName;
-import java.util.*;
-
 /**
  * @author Matthieu Riou <mriou at apache dot org>
  */
@@ -96,9 +98,7 @@
         Operation operation = getMyRoleOperation(mex.getOperation());
         if (operation == null) {
             __log.error(__msgs.msgUnknownOperation(mex.getOperation(), _plinkDef.myRolePortType.getQName()));
-            mex.setStatus(Status.FAILURE.toString());
-            mex.setFailureType(MessageExchange.FailureType.UNKNOWN_OPERATION.toString());
-            mex.setFaultExplanation(mex.getOperation());
+            MexDaoUtil.setFailed(mex, FailureType.UNKNOWN_OPERATION, mex.getOperation());
             return null;
         }
 
@@ -113,9 +113,8 @@
             if (isCreateInstnace)
                 invokeMyRoleCreateInstance(mex, operation, correlatorId, correlator);
             else {
-                mex.setStatus(Status.FAILURE.toString());
-                mex.setFailureType(MessageExchange.FailureType.OTHER.toString());
-                mex.setFaultExplanation("Invalid in-memory process: non createInstance operations are not supported!");
+                MexDaoUtil.setFailed(mex, FailureType.OTHER,
+                        "Invalid in-memory process: non createInstance operations are not supported!");
                 return null;
             }
 
@@ -139,10 +138,7 @@
                 // We'd like to do a graceful exit here, no sense in rolling back due to a
                 // a message format problem.
                 __log.debug("Unable to evaluate correlation keys, invalid message format. ", ime);
-                mex.setFailureType(MessageExchange.FailureType.FORMAT_ERROR.toString());
-                mex.setStatus(Status.FAILURE.toString());
-                mex.setFaultExplanation(ime.getMessage());
-
+                MexDaoUtil.setFailed(mex, FailureType.FORMAT_ERROR,  ime.getMessage());
                 return null;
             }
 
@@ -201,8 +197,8 @@
 
                 mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED.toString());
                 mex.setInstance(messageRoute.getTargetInstance());
-                
-                // We're overloading the channel here to be the PICK response channel +  index
+
+                // We're overloading the channel here to be the PICK response channel + index
                 mex.setChannel(messageRoute.getGroupId() + "&" + messageRoute.getIndex());
             } else {
                 if (__log.isDebugEnabled()) {
@@ -227,15 +223,7 @@
             }
 
         }
-        // Now we have to update our message exchange status. If the <reply>
-        // was not hit during the
-        // invocation, then we will be in the "REQUEST" phase which means
-        // that either this was a one-way
-        // or a two-way that needs to delivery the reply asynchronously.
-        if (Status.valueOf(mex.getStatus()) == MessageExchange.Status.REQUEST) {
-            mex.setStatus(MessageExchange.Status.ASYNC.toString());
-        }
-        
+
         return CorrelationStatus.valueOf(mex.getCorrelationStatus());
     }
 

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java Mon Aug  6 13:47:58 2007
@@ -18,14 +18,33 @@
  */
 package org.apache.ode.bpel.engine;
 
+import java.util.Set;
+
+import javax.transaction.Transaction;
+import javax.wsdl.Operation;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.engine.PartnerRoleMessageExchangeImpl.State;
+import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.o.OPartnerLink;
+import org.w3c.dom.Element;
 
 /**
+ * 
+ * Class providing a lot of the dirty work of IL invokes.
+ * 
  * @author Matthieu Riou <mriou at apache dot org>
+ * @author Maciej Szefler <mszefler at gmail dot com>
  */
 class PartnerLinkPartnerRoleImpl extends PartnerLinkRoleImpl {
     static final Log __log = LogFactory.getLog(BpelProcess.class);
@@ -39,4 +58,297 @@
         _initialPartner = initialPartner;
     }
 
+    PartnerRoleMessageExchangeImpl createPartnerRoleMex(MessageExchangeDAO mexdao) {
+        InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
+        PartnerRoleMessageExchangeImpl mex;
+        Operation op = _plinkDef.getPartnerRoleOperation(mexdao.getOperation());
+        EndpointReference myroleEPR = _plinkDef.hasMyRole() ? _process.getInitialMyRoleEPR(_plinkDef) : null;
+        switch (istyle) {
+        case UNRELIABLE:
+            mex = new UnreliablePartnerRoleMessageExchangeImpl(_process, mexdao.getInstance().getInstanceId(), mexdao
+                    .getMessageExchangeId(), _plinkDef, op, /* EPR todo */
+            null, myroleEPR, _channel);
+            break;
+        case TRANSACTED:
+            mex = new TransactedPartnerRoleMessageExchangeImpl(_process, mexdao.getInstance().getInstanceId(), mexdao
+                    .getMessageExchangeId(), _plinkDef, op, /*
+                                                             * EPR todo
+                                                             */
+            null, myroleEPR, _channel);
+            break;
+        case RELIABLE:
+            mex = new ReliablePartnerRoleMessageExchangeImpl(_process, mexdao.getInstance().getInstanceId(), mexdao
+                    .getMessageExchangeId(), _plinkDef, op, null, /* EPR todo */
+            myroleEPR, _channel);
+            break;
+
+        default:
+            throw new BpelEngineException("Unexpected InvocationStyle: " + istyle);
+
+        }
+
+        mex.load(mexdao);
+        return mex;
+
+    }
+
+    /**
+     * Invoke a partner through the integration layer.
+     * 
+     * @param mexDao
+     */
+    void invokeIL(MessageExchangeDAO mexDao) {
+
+        Element partnerEprXml = mexDao.getEPR();
+        EndpointReference partnerEpr = partnerEprXml == null ? _initialEPR : _contexts.eprContext
+                .resolveEndpointReference(partnerEprXml);
+        EndpointReference myRoleEpr = null; // TODO: fix?
+        Operation operation = _plinkDef.getPartnerRoleOperation(mexDao.getOperation());
+        Set<InvocationStyle> supportedStyles = _contexts.mexContext.getSupportedInvocationStyle(_channel, partnerEpr);
+
+        boolean oneway = MessageExchangePattern.valueOf(mexDao.getPattern()) == MessageExchangePattern.REQUEST_ONLY;
+
+        if (_process.isInMemory()) {
+            invokeInMem(mexDao, partnerEpr, myRoleEpr, operation, supportedStyles, oneway);
+        } else {
+            invokePersisted(mexDao, partnerEpr, myRoleEpr, operation, supportedStyles);
+        }
+
+    }
+
+    private void invokePersisted(MessageExchangeDAO mexDao, EndpointReference partnerEpr, EndpointReference myRoleEpr,
+            Operation operation, Set<InvocationStyle> supportedStyles) {
+        if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+            invokeTransacted(mexDao, partnerEpr, myRoleEpr, operation);
+        } else if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
+            invokeReliable(mexDao, partnerEpr, myRoleEpr, operation);
+        } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
+            invokeUnreliable(mexDao, partnerEpr, myRoleEpr, operation);
+        } else {
+            // This really should not happen, indicates IL is screwy.
+            __log.error("Integration Layer did not agree to any known invocation style for EPR " + partnerEpr);
+            mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
+            mexDao.setStatus(Status.ACK.toString());
+            mexDao.setAckType(AckType.FAILURE);
+            mexDao.setFaultExplanation("NoMatchingStyle");
+        }
+    }
+
+    private void invokeUnreliable(MessageExchangeDAO mexDao, EndpointReference partnerEpr, EndpointReference myRoleEpr,
+            Operation operation) {
+        // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
+        UnreliablePartnerRoleMessageExchangeImpl blockingMex = new UnreliablePartnerRoleMessageExchangeImpl(_process, mexDao
+                .getInstance().getInstanceId(), mexDao.getMessageExchangeId(), _plinkDef, operation, partnerEpr, myRoleEpr,
+                _channel);
+        // We schedule in-memory (no db) to guarantee "at most once" semantics.
+        blockingMex.setState(State.INVOKE_XXX);
+        _process.scheduleInstanceWork(mexDao.getInstance().getInstanceId(), new UnreliableInvoker(blockingMex));
+    }
+
+    /**
+     * Invoke an in-memory process. In-memory processes are a bit different, we're never going to do any scheduling for them, so
+     * we'd prefer to have TRANSACTED invocation style. If that is not available we have to fake it.
+     * 
+     * @param mexDao
+     * @param partnerEpr
+     * @param myRoleEpr
+     * @param operation
+     * @param supportedStyles
+     * @param oneway
+     */
+    private void invokeInMem(MessageExchangeDAO mexDao, EndpointReference partnerEpr, EndpointReference myRoleEpr,
+            Operation operation, Set<InvocationStyle> supportedStyles, boolean oneway) {
+        // In-memory processes are a bit different, we're never going to do any scheduling for them, so we'd
+        // prefer to have TRANSACTED invocation style.
+        if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+            invokeTransacted(mexDao, partnerEpr, myRoleEpr, operation);
+        } else if (supportedStyles.contains(InvocationStyle.RELIABLE) && oneway) {
+            invokeReliable(mexDao, partnerEpr, myRoleEpr, operation);
+        } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
+            UnreliablePartnerRoleMessageExchangeImpl unreliableMex = new UnreliablePartnerRoleMessageExchangeImpl(_process, mexDao
+                    .getInstance().getInstanceId(), mexDao.getMessageExchangeId(), _plinkDef, operation, partnerEpr, myRoleEpr,
+                    _channel);
+
+            // Need to cheat a little bit for in-memory processes; do the invoke in-line, but first suspend
+            // the transaction so that the IL does not get confused.
+            Transaction tx;
+            try {
+                tx = _contexts.txManager.suspend();
+            } catch (Exception ex) {
+                throw new BpelEngineException("TxManager Error: cannot suspend!", ex);
+            }
+
+            try {
+                unreliableMex.setState(State.INVOKE_XXX);
+                _contexts.mexContext.invokePartnerBlocking(unreliableMex);
+                try {
+                    unreliableMex.waitForAck(mexDao.getTimeout());
+                } catch (InterruptedException ie) {
+                    ;
+                    ; // ignore
+                }
+
+            } finally {
+                unreliableMex.setState(State.DEAD);
+                try {
+                    _contexts.txManager.resume(tx);
+                } catch (Exception e) {
+                    throw new BpelEngineException("TxManager Error: cannot resume!", e);
+                }
+            }
+
+            if (unreliableMex.getStatus() != Status.ACK) {
+                MexDaoUtil.setFailed(mexDao, FailureType.NO_RESPONSE, "No Response");
+            } else {
+                unreliableMex.save(mexDao);
+            }
+        } else /* non-supported in-mem style */{
+            MexDaoUtil.setFailed(mexDao, FailureType.OTHER, "Unsupported invocation style for in-mem process.");
+        }
+    }
+
+    private void invokeReliable(MessageExchangeDAO mexDao, EndpointReference partnerEpr, EndpointReference myRoleEpr,
+            Operation operation) {
+        // We can do RELIABLE for in-mem, but only if they are one way.
+        ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_process, mexDao
+                .getInstance().getInstanceId(), mexDao.getMessageExchangeId(), _plinkDef, operation, partnerEpr, myRoleEpr,
+                _channel);
+        reliableMex.setState(State.INVOKE_XXX);
+        Throwable err = null;
+        try {
+            _contexts.mexContext.invokePartnerReliable(reliableMex);
+        } catch (Throwable t) {
+            err = t;
+        }
+
+        reliableMex.setState(State.HOLD);
+
+        if (err != null) {
+            MexDaoUtil.setFailed(mexDao,FailureType.COMMUNICATION_ERROR, err.toString());
+            reliableMex.setState(State.DEAD);
+        } else {
+            if (reliableMex.getStatus() == Status.ACK) {
+                reliableMex.save(mexDao);
+                reliableMex.setState(State.DEAD);
+            } else 
+                reliableMex.setState(State.ASYNC);
+        }
+                    
+    }
+
+    private void invokeTransacted(MessageExchangeDAO mexDao, EndpointReference partnerEpr, EndpointReference myRoleEpr,
+            Operation operation) {
+        // If TRANSACTED is supported, this is again easy, do it in-line.
+        TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_process, mexDao
+                .getInstance().getInstanceId(), mexDao.getMessageExchangeId(), _plinkDef, operation, partnerEpr, myRoleEpr,
+                _channel);
+        transactedMex.setState(State.INVOKE_XXX);
+        try {
+            _contexts.mexContext.invokePartnerTransacted(transactedMex);
+        } catch (Throwable t) {
+            __log.error("Transacted partner invoke threw an exception; rolling back.");
+            try {
+                _contexts.txManager.setRollbackOnly();
+            } catch (Exception ex) {
+                __log.fatal("TransactionManager error, could not setRollbackOnly()",ex);
+            }
+            throw new BpelEngineException("Rollback required.",t);
+        } finally {
+            transactedMex.setState(State.DEAD);
+        }
+        
+        if (transactedMex.getStatus() != Status.ACK) {
+            MexDaoUtil.setFailed(mexDao, FailureType.NO_RESPONSE, "Integration Layer did not provide required ACK.");
+        } else {
+            transactedMex.save(mexDao);
+        }
+        
+    }
+
+    /**
+     * Runnable that actually performs UNRELIABLE invokes on the partner.
+     * 
+     * @author Maciej Szefler <mszefler at gmail dot com>
+     * 
+     */
+    class UnreliableInvoker implements Runnable {
+
+        UnreliablePartnerRoleMessageExchangeImpl _unreliableMex;
+
+        BpelInstanceWorker _iworker;
+
+        /** Keep a copy of the last BpelRuntimeContextImpl; this is used to optimize away a DB read. */
+        BpelRuntimeContextImpl _lastBRC;
+
+        /**
+         * 
+         * @param blockingMex
+         *            the MEX we're invoking on the partner
+         * @param iworker
+         *            instance worker (for scheduling continuation)
+         * @param lastBpelRuntimeContextImpl
+         *            the BRC that initiated this invoke
+         */
+        public UnreliableInvoker(UnreliablePartnerRoleMessageExchangeImpl blockingMex) {
+            _unreliableMex = blockingMex;
+        }
+
+        public void run() {
+            assert !_contexts.isTransacted();
+
+            // Do the unreliable invoke (outside of tx context). A system failure here will result in the mex going
+            // into an unknown state requiring manual intervention.
+            Throwable err = null;
+            Status status;
+            _unreliableMex.setState(State.INVOKE_XXX);
+            try {
+                _contexts.mexContext.invokePartnerBlocking(_unreliableMex);
+                _unreliableMex.setState(State.HOLD);
+            } catch (Throwable t) {
+                _unreliableMex.setState(State.DEAD);
+                err = t;
+            }
+
+            final Throwable ferr = err;
+
+            // We proceed handling the response in a transaction. Note that if for some reason the following transaction
+            // fails, the unreliable invoke will be in an "unknown" state, and will require manual intervention to either
+            // retry or force fail.
+            try {
+
+                _contexts.execTransaction(new Runnable() {
+                    public void run() {
+
+                        MessageExchangeDAO mexdao = _process.loadMexDao(_unreliableMex.getMessageExchangeId());
+                        if (ferr != null) {
+                            MexDaoUtil.setFailed(mexdao, FailureType.OTHER, ferr.toString());
+                            _unreliableMex.setState(State.DEAD);
+                        } else if (_unreliableMex.getStatus() == Status.ACK) {
+                            _unreliableMex.save(mexdao);
+                            _unreliableMex.setState(State.DEAD);
+                        } else if (_unreliableMex.getStatus() == Status.REQ && !_unreliableMex._asyncReply) {
+                            MexDaoUtil.setFailed(mexdao, FailureType.NO_RESPONSE, "No Response");
+                            _unreliableMex.setState(State.DEAD);
+                        } else if (_unreliableMex._asyncReply) {
+                            _unreliableMex.setState(State.ASYNC);
+                            return;
+                        } else {
+                            // We should have exhausted the possibilities.
+                            throw new BpelEngineException("InternalError: Unexpected message exchange state!");
+                        }
+
+                        _process.executeContinueInstancePartnerRoleResponseReceived(mexdao);
+
+                    }
+
+                });
+            } catch (Throwable t) {
+                _unreliableMex.setState(State.DEAD);
+                __log.error("Transaction Failed (TODO!!!!): Need to mark instance for user action", t);
+                // TODO: Schedule something to pick up the job (we cant just retry bc the invoke is complete!
+            }
+
+        }
+
+    }
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java Mon Aug  6 13:47:58 2007
@@ -28,10 +28,12 @@
     protected OPartnerLink _plinkDef;
     protected EndpointReference _initialEPR;
     protected BpelProcess _process;
+    protected Contexts _contexts;
 
     PartnerLinkRoleImpl(BpelProcess process, OPartnerLink plink) {
         _plinkDef = plink;
         _process = process;
+        _contexts = _process._contexts;
     }
     String getPartnerLinkName() {
         return _plinkDef.name;