You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/03 22:07:34 UTC

svn commit: r562569 - /ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/

Author: mszefler
Date: Fri Aug  3 13:07:33 2007
New Revision: 562569

URL: http://svn.apache.org/viewvc?view=rev&rev=562569
Log:
removed the distinction between ASYNC/BLOCKING invocation style. 

Added:
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
      - copied, changed from r562188, ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
      - copied, changed from r561873, ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
Removed:
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
Modified:
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
    ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Fri Aug  3 13:07:33 2007
@@ -33,7 +33,6 @@
 import java.util.concurrent.Future;
 
 import javax.wsdl.Operation;
-import javax.wsdl.PortType;
 import javax.xml.namespace.QName;
 
 import org.apache.commons.logging.Log;
@@ -55,7 +54,6 @@
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
@@ -63,9 +61,7 @@
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
-import org.apache.ode.bpel.o.OElementVarType;
 import org.apache.ode.bpel.o.OExpressionLanguage;
-import org.apache.ode.bpel.o.OMessageVarType;
 import org.apache.ode.bpel.o.OPartnerLink;
 import org.apache.ode.bpel.o.OProcess;
 import org.apache.ode.bpel.o.Serializer;
@@ -155,10 +151,12 @@
 
         // TODO : do this on a per-partnerlink basis, support transacted styles.
         HashSet<InvocationStyle> istyles = new HashSet<InvocationStyle>();
-        istyles.add(InvocationStyle.BLOCKING);
+        istyles.add(InvocationStyle.UNRELIABLE);
+        
         if (!conf.isTransient()) {
-            istyles.add(InvocationStyle.ASYNC);
             istyles.add(InvocationStyle.RELIABLE);
+        } else {
+            istyles.add(InvocationStyle.TRANSACTED);
         }
 
         _invocationStyles = Collections.unmodifiableSet(istyles);
@@ -704,14 +702,11 @@
         case RELIABLE:
             mex = new ReliableMyRoleMessageExchangeImpl(this, mexId, oplink, operation, target);
             break;
-        case ASYNC:
-            mex = new AsyncMyRoleMessageExchangeImpl(this, mexId, oplink, operation, target);
-            break;
         case TRANSACTED:
             mex = new TransactedMyRoleMessageExchangeImpl(this, mexId, oplink, operation, target);
             break;
-        case BLOCKING:
-            mex = new BlockingMyRoleMessageExchangeImpl(this, mexId, oplink, operation, target);
+        case UNRELIABLE:
+            mex = new UnreliableMyRoleMessageExchangeImpl(this, mexId, oplink, operation, target);
             break;
         default:
             throw new AssertionError("Unexpected invocation style: " + istyle);
@@ -768,15 +763,10 @@
             OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
             Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
             switch (istyle) {
-            case BLOCKING:
-                mex = new BlockingPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
+            case UNRELIABLE:
+                mex = new UnreliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
                 plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
                 break;
-            case ASYNC:
-                mex = new AsyncPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
-                plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
-                break;
-
             case TRANSACTED:
                 mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /*
                                                                                                                              * EPR
@@ -1111,6 +1101,8 @@
     }
 
     void fireMexStateEvent(MessageExchangeDAO mexdao, Status old, Status news) {
+        // TODO: force a myrole mex to be created if it is not in cache.
+        
         if (old != news)
             for (WeakReference<MyRoleMessageExchangeImpl> wr : _mexStateListeners) {
                 MyRoleMessageExchangeImpl mymex = wr.get();
@@ -1118,6 +1110,5 @@
                     mymex.onStateChanged(mexdao, old, news);
             }
 
-        // TODO: need to call MessageExchangeContext#onMyRoleMessageExchangeStateChanged
     }
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Fri Aug  3 13:07:33 2007
@@ -27,6 +27,9 @@
 import java.util.List;
 import java.util.Set;
 
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
 import javax.wsdl.Operation;
 import javax.wsdl.PortType;
 import javax.xml.namespace.QName;
@@ -89,6 +92,7 @@
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.Namespaces;
 import org.apache.ode.utils.ObjectPrinter;
+import org.omg.CORBA._PolicyStub;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
@@ -125,7 +129,7 @@
     private List<MyRoleMessageExchangeImpl> _pendingMyRoleReplies = new LinkedList<MyRoleMessageExchangeImpl>();
 
     private BpelInstanceWorker _instanceWorker;
-    
+
     private BpelProcess _bpelProcess;
 
     /** Five second maximum for continous execution. */
@@ -135,9 +139,7 @@
 
     private boolean _executed;
 
-    public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, 
-            ProcessInstanceDAO dao, 
-            PROCESS PROCESS,
+    public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
             MessageExchangeDAO instantiatingMessageExchange) {
         _instanceWorker = instanceWorker;
         _bpelProcess = instanceWorker._process;
@@ -174,6 +176,10 @@
         }
     }
 
+    public String toString() {
+        return "{BpelRuntimeCtx PID=" + _bpelProcess.getPID() + ", IID=" + _iid + "}";
+    }
+
     public Long getPid() {
         return _iid;
     }
@@ -537,37 +543,37 @@
 
         Status previousStatus = Status.valueOf(myrolemex.getStatus());
         myrolemex.setStatus(status.toString());
-        _bpelProcess.fireMexStateEvent(myrolemex, previousStatus, status);
         
-        if (myrolemex.getPipedMessageExchange() != null) /* p2p case */{
-            MessageExchangeDAO pmex = myrolemex.getPipedMessageExchange();
+        doMyRoleResponse(myrolemex, previousStatus, status);
 
-            if (BpelProcess.__log.isDebugEnabled()) {
-                __log.debug("Replying to a p2p mex, myrole " + myrolemex + " - partnerole " + pmex);
-            }
-
-            // In the p2p case we copy the status/response from one mex object into the other.
-            pmex.setResponse(myrolemex.getResponse());
-            pmex.setStatus(myrolemex.getStatus());
-            continuePartnerReplied(pmex);
-            myrolemex.release();
+        sendEvent(evt);
+    }
 
+    private void doMyRoleResponse(MessageExchangeDAO myrolemex, Status previousStatus, Status newStatus) {
+        myrolemex.setStatus(newStatus.toString());
+        if (myrolemex.getPipedMessageExchange() != null) /* p2p case */{
+            p2pResponse(myrolemex);
         } else /* IL-mediated communication */{
-            InvocationStyle istyle = InvocationStyle.valueOf(myrolemex.getInvocationStyle());
-            switch (istyle) {
-            case RELIABLE:
-                scheduleReliableResponse(myrolemex);
-                break;
-            case ASYNC:
-                scheduleAsyncResponse(myrolemex);
-                break;
-            default:
-                break;
-            }
+            _bpelProcess.fireMexStateEvent(myrolemex, previousStatus, newStatus);
+        }
+    }
 
+    /**
+     * Handle P2P responses. 
+     * @param myrolemex
+     */
+    private void p2pResponse(MessageExchangeDAO myrolemex) {
+        MessageExchangeDAO pmex = myrolemex.getPipedMessageExchange();
+
+        if (BpelProcess.__log.isDebugEnabled()) {
+            __log.debug("Replying to a p2p mex, myrole " + myrolemex + " - partnerole " + pmex);
         }
 
-        sendEvent(evt);
+        // In the p2p case we copy the status/response from one mex object into the other.
+        pmex.setResponse(myrolemex.getResponse());
+        pmex.setStatus(myrolemex.getStatus());
+        continuePartnerReplied(pmex);
+        myrolemex.release();
     }
 
     /**
@@ -648,16 +654,26 @@
     }
 
     public void registerTimer(TimerResponseChannel timerChannel, Date timeToFire) {
+
+        if (_bpelProcess.isInMemory())
+            throw new InvalidProcessException("Process not compatible with in-memory execution.");
+
         WorkEvent we = new WorkEvent();
         we.setIID(_dao.getInstanceId());
+        we.setProcessId(_bpelProcess.getPID());
         we.setChannel(timerChannel.export());
         we.setType(WorkEvent.Type.TIMER);
         _contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
     }
 
     private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey key) {
+
+        if (_bpelProcess.isInMemory())
+            throw new InvalidProcessException("Process not compatible with in-memory execution.");
+
         WorkEvent we = new WorkEvent();
         we.setIID(_dao.getInstanceId());
+        we.setProcessId(_bpelProcess.getPID());
         we.setType(WorkEvent.Type.MATCHER);
         we.setCorrelatorId(correlatorId);
         we.setCorrelationKey(key);
@@ -696,7 +712,8 @@
         evt.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
         evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
 
-        MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(),MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
+        MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(),
+                MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
         mexDao.setStatus(MessageExchange.Status.NEW.toString());
         mexDao.setOperation(operation.getName());
         mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
@@ -773,39 +790,83 @@
             return;
         }
 
-        PortType portType = partnerLink.partnerLink.partnerRolePortType;
         EndpointReference myRoleEpr = null; // TODO: how did we get this?
 
         mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
         mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
         Set<InvocationStyle> supportedStyles = _contexts.mexContext.getSupportedInvocationStyle(partnerRoleChannel, partnerEpr);
-        if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
-            // If RELIABLE is supported, this is easy, we just do it in-line.
-            ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
-                    .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
-            _contexts.mexContext.invokePartnerReliable(reliableMex);
-        } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
-            // If TRANSACTED is supported, this is again easy, do it in-line.
-            TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
-                    mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
-            _contexts.mexContext.invokePartnerTransacted(transactedMex);
-        } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
-            // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
-            BlockingPartnerRoleMessageExchangeImpl blockingMex = new BlockingPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
-                    .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
-            schedule(new BlockingInvoker(blockingMex));
-        } else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
-            // For ASYNC style, we defer the call until after commit (unless idempotent).
-            AsyncPartnerRoleMessageExchangeImpl asyncMex = new AsyncPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
-                    .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
-            schedule(new AsyncInvoker(asyncMex));
-            
+
+        boolean oneway = MessageExchangePattern.valueOf(mexDao.getPattern()) == MessageExchangePattern.REQUEST_ONLY;
+
+        if (_bpelProcess.isInMemory()) {
+            // In-memory processes are a bit different, we're never going to do any scheduling for them, so we'd
+            // prefer to have TRANSACTED invocation style.
+            if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+                // If TRANSACTED is supported, this is again easy, do it in-line.
+                TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
+                        mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+                        partnerRoleChannel);
+                _contexts.mexContext.invokePartnerTransacted(transactedMex);
+            } else if (supportedStyles.contains(InvocationStyle.RELIABLE) && oneway) {
+                // We can do RELIABLE for in-mem, but only if they are one way.
+                ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+                        mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+                        partnerRoleChannel);
+                _contexts.mexContext.invokePartnerReliable(reliableMex);
+
+            } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
+                // Need to cheat a little bit for in-memory processes; do the invoke in-line, but first suspend
+                // the transaction so that the IL does not get confused.
+                Transaction tx;
+                try {
+                    tx = _contexts.txManager.suspend();
+                } catch (Exception ex) {
+                    throw new BpelEngineException("TxManager Error: cannot suspend!", ex);
+                }
+                try {
+                    UnreliablePartnerRoleMessageExchangeImpl unreliableMex = new UnreliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+                            mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+                            partnerRoleChannel);
+                    _contexts.mexContext.invokePartnerBlocking(unreliableMex);
+                    unreliableMex.waitForResponse();
+                } finally {
+                    try {
+                        _contexts.txManager.resume(tx);
+                    } catch (Exception e) {
+                        throw new BpelEngineException("TxManager Error: cannot resume!", e);
+                    }
+                }
+            }
         } else {
-            // This really should not happen, indicates IL is screwy.
-            __log.error("Integration Layer did not agree to any known invocation style for EPR " + partnerEpr);
-            mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
-            mexDao.setStatus(Status.FAILURE.toString());
-            mexDao.setFaultExplanation("NoMatchingStyle");
+            if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+
+                // If TRANSACTED is supported, this is again easy, do it in-line. Also, this what we always do for
+                // in-mem processes (even if the IL claims to not support it.)
+                TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
+                        mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+                        partnerRoleChannel);
+                _contexts.mexContext.invokePartnerTransacted(transactedMex);
+            } else if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
+                // If RELIABLE is supported, this is easy, we just do it in-line.
+                ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+                        mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+                        partnerRoleChannel);
+                _contexts.mexContext.invokePartnerReliable(reliableMex);
+            } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
+                // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
+                UnreliablePartnerRoleMessageExchangeImpl blockingMex = new UnreliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+                        mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+                        partnerRoleChannel);
+                // We schedule in-memory (no db) to guarantee "at most once" semantics.
+                schedule(new UnreliableInvoker(blockingMex));
+                // TODO: how do we recover the invocation if system dies in BlockingInvoker?
+            } else {
+                // This really should not happen, indicates IL is screwy.
+                __log.error("Integration Layer did not agree to any known invocation style for EPR " + partnerEpr);
+                mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
+                mexDao.setStatus(Status.FAILURE.toString());
+                mexDao.setFaultExplanation("NoMatchingStyle");
+            }
         }
 
     }
@@ -844,7 +905,7 @@
             else if (target.getSupportedInvocationStyle(serviceName).contains(InvocationStyle.TRANSACTED))
                 style = InvocationStyle.TRANSACTED;
             else
-                style = InvocationStyle.BLOCKING;
+                style = InvocationStyle.UNRELIABLE;
         } else /* persisted */{
 
             if (operation.getOutput() != null
@@ -869,7 +930,8 @@
         if (__log.isDebugEnabled())
             __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
 
-        MessageExchangeDAO myRoleMex = _bpelProcess.createMessageExchange(new GUID().toString(),MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+        MessageExchangeDAO myRoleMex = _bpelProcess.createMessageExchange(new GUID().toString(),
+                MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
         myRoleMex.setCallee(serviceName);
         myRoleMex.setPipedMessageExchange(partnerRoleMex);
         myRoleMex.setOperation(partnerRoleMex.getOperation());
@@ -898,9 +960,8 @@
             throw new BpelEngineException("MUST RUN IN TRANSACTION!");
         if (_executed)
             throw new IllegalStateException("cannot call execute() twice!");
-        
+
         long maxTime = System.currentTimeMillis() + _maxReductionTimeMs;
-        
 
         // Execute the process state reductions
         boolean canReduce = true;
@@ -941,6 +1002,7 @@
                 try {
                     WorkEvent we = new WorkEvent();
                     we.setIID(_iid);
+                    we.setProcessId(_bpelProcess.getPID());
                     we.setType(WorkEvent.Type.RESUME);
                     _contexts.scheduler.schedulePersistedJob(we.getDetail(), new Date());
                 } catch (ContextException e) {
@@ -1139,15 +1201,7 @@
                     mexDao.setFaultExplanation(optionalFaultData.toString());
                 }
                 mexDao.setFaultExplanation("Process completed without responding.");
-
-                switch (istyle) {
-                case RELIABLE:
-                    scheduleReliableResponse(mexDao);
-                    break;
-                case ASYNC:
-                    scheduleAsyncResponse(mexDao);
-                }
-
+                doMyRoleResponse(mexDao, status, Status.FAILURE);
             }
         }
     }
@@ -1351,7 +1405,7 @@
      * Attempt to match message exchanges on a correlator.
      * 
      */
-    public void matcherEvent(String correlatorId, CorrelationKey ckey) {
+    void matcherEvent(String correlatorId, CorrelationKey ckey) {
         if (BpelProcess.__log.isDebugEnabled()) {
             __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckey=" + ckey);
         }
@@ -1387,19 +1441,6 @@
         }
     }
 
-    /**
-     * Add a scheduled ASYNC response.
-     * 
-     * @param messageExchangeId
-     */
-    private void scheduleAsyncResponse(MessageExchangeDAO mexdao) {
-        assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
-        assert _contexts.isTransacted();
-
-        final MyRoleMessageExchangeImpl mex = _bpelProcess.recreateMyRoleMex(mexdao);
-        _pendingMyRoleReplies.add(mex);
-    }
-
 
     private void scheduleReliableResponse(MessageExchangeDAO messageExchange) {
         assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
@@ -1422,31 +1463,52 @@
     private void continuePartnerReplied(MessageExchangeDAO pmex) {
 
     }
-    
-    class BlockingInvoker implements Runnable {
 
-        public BlockingInvoker(BlockingPartnerRoleMessageExchangeImpl blockingMex) {
-            // TODO Auto-generated constructor stub
+    /**
+     * Runnable that actually performs UNRELIABLE invokes on the partner.
+     * 
+     * @author Maciej Szefler <mszefler at gmail dot com>
+     * 
+     */
+    class UnreliableInvoker implements Runnable {
+
+        UnreliablePartnerRoleMessageExchangeImpl _blockingMex;
+
+        public UnreliableInvoker(UnreliablePartnerRoleMessageExchangeImpl blockingMex) {
+            _blockingMex = blockingMex;
         }
 
         public void run() {
-            // TODO Auto-generated method stub
-            
+            assert !_contexts.isTransacted();
+
+            // TODO: what happens if system fails right here? we'll need to add a "retry" possibility
+
+            Runnable prc;
+            try {
+                _contexts.mexContext.invokePartnerBlocking(_blockingMex);
+                prc = new PartnerResponseContinuation(_blockingMex);
+            } catch (Exception ce) {
+                prc = new PartnerResponseContinuation(_blockingMex);
+            }
+
+            // Keep using the same thread to do the work, but note we need to run this in a transaction.
+            _instanceWorker.enqueue(_bpelProcess._server.new TransactedRunnable(prc));
         }
-        
+
     }
 
-    
-    class AsyncInvoker implements Runnable {
 
-        public AsyncInvoker(AsyncPartnerRoleMessageExchangeImpl asyncMex) {
-            // TODO Auto-generated constructor stub
+    class PartnerResponseContinuation implements Runnable {
+
+        private UnreliablePartnerRoleMessageExchangeImpl _mex;
+
+        public PartnerResponseContinuation(UnreliablePartnerRoleMessageExchangeImpl blockingMex) {
+            _mex = blockingMex;
         }
 
         public void run() {
-            // TODO Auto-generated method stub
-            
+
         }
-        
+
     }
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java Fri Aug  3 13:07:33 2007
@@ -171,6 +171,7 @@
 
                         WorkEvent we = new WorkEvent();
                         we.setIID(iid);
+                        we.setProcessId(_process.getPID());
                         we.setType(WorkEvent.Type.RESUME);
                         _process._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
 
@@ -294,6 +295,7 @@
 
                         WorkEvent we = new WorkEvent();
                         we.setType(WorkEvent.Type.RESUME);
+                        we.setProcessId(_process.getPID());
                         we.setIID(iid);
                         _process._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
 

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Fri Aug  3 13:07:33 2007
@@ -218,7 +218,6 @@
     }
 
     void serverFailed(FailureType type, String reason, Element details) {
-        // TODO not using FailureType, nor details
         _failureType = type;
         _explanation = reason;
         setStatus(Status.FAILURE);

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java Fri Aug  3 13:07:33 2007
@@ -175,6 +175,7 @@
 
     protected WorkEvent generateInvokeResponseWorkEvent() {
         WorkEvent we = new WorkEvent();
+        we.setProcessId(_process.getPID());
         we.setIID(_iid);
         we.setType(WorkEvent.Type.PARTNER_RESPONSE);
         we.setChannel(_responseChannel);
@@ -189,9 +190,6 @@
         if (getStatus() != MessageExchange.Status.REQUEST && getStatus() != MessageExchange.Status.ASYNC)
             throw new BpelEngineException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + getStatus());
 
-        // In-memory processe are special, they don't allow scheduling so any replies must be delivered immediately.
-        if (!_blocked && _process.isInMemory())
-            throw new BpelEngineException("Cannot reply to in-memory process outside of BLOCKING call");
     }
 
 }

Copied: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java (from r562188, ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java)
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=562569&p1=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java&r1=562188&p2=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java Fri Aug  3 13:07:33 2007
@@ -11,8 +11,8 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.o.OPartnerLink;
 
@@ -22,12 +22,13 @@
  * @author Maciej Szefler <mszefler at gmail dot com>
  * 
  */
-public class AsyncMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
+public class UnreliableMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
     private static final Log __log = LogFactory.getLog(ReliableMyRoleMessageExchangeImpl.class);
 
+    boolean _done = false;
     ResponseFuture _future;
     
-    public AsyncMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
+    public UnreliableMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
         super(process, mexId, oplink, operation, callee);
     }
 
@@ -59,7 +60,7 @@
         _process.enqueueTransaction(new Callable<Void>() {
 
             public Void call() throws Exception {
-                AsyncMyRoleMessageExchangeImpl.super.setStatus(Status.REQUEST);
+                UnreliableMyRoleMessageExchangeImpl.super.setStatus(Status.REQUEST);
                 MessageExchangeDAO dao = _process.createMessageExchange(getMessageExchangeId(), MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
                 save(dao);
                 if (_process.isInMemory()) 
@@ -76,6 +77,32 @@
 
     }
 
+  
+    @Override
+    public InvocationStyle getInvocationStyle() {
+        return InvocationStyle.UNRELIABLE;
+    }
+    
+
+    @Override
+    public Status invokeBlocking() throws BpelEngineException, TimeoutException {
+        if (_done) 
+            return getStatus();
+
+        Future<Status> future = _future != null ? _future : super.invokeAsync();
+        
+        try {
+            future.get(Math.max(_timeout,1), TimeUnit.MILLISECONDS);
+            _done = true;
+            return getStatus();
+        } catch (InterruptedException e) {
+            throw new BpelEngineException(e);
+        } catch (ExecutionException e) {
+            throw new BpelEngineException(e.getCause());
+        } 
+    }    
+    
+
     private static class ResponseFuture implements Future<Status> {
         private Status _status;
 
@@ -121,11 +148,6 @@
                 this.notifyAll();
             }
         }
-    }
-
-    @Override
-    public InvocationStyle getInvocationStyle() {
-        return InvocationStyle.ASYNC;
     }
 
 }

Copied: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java (from r561873, ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java)
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=562569&p1=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java&r1=561873&p2=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java Fri Aug  3 13:07:33 2007
@@ -2,46 +2,109 @@
 
 import javax.wsdl.Operation;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.engine.MessageExchangeImpl.InDbAction;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.o.OPartnerLink;
 
 /**
  * Implementation of the {@link PartnerRoleMessageExchange} interface that is passed to the IL when the 
- * BLOCKING invocation style is used (see {@link InvocationStyle#BLOCKING}). The basic idea here is that 
- * with this style, the IL performs the operation while blocking in the 
- * {@link MessageExchangeContext#invokePartner(org.apache.ode.bpel.iapi.PartnerRoleMessageExchange)} method.
+ * UNRELIABLE invocation style is used (see {@link InvocationStyle#UNRELIABLE}). The basic idea here is 
+ * that with this style, the IL performs the operation outside of a transactional context. It can either
+ * finish it right away (BLOCK) or indicate that the response will be provided later (replyASYNC).  
  *
- * This InvocationStyle makes this class rather trivial. 
+ *  
+ *  TODO: serious synchronization issues in this class.
  *  
  * @author Maciej Szefler <mszefler at gmail dot com>
  *
  */
-public class BlockingPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
+public class UnreliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
+    private static final Log __log = LogFactory.getLog(UnreliablePartnerRoleMessageExchangeImpl.class);
+    
 
-    BlockingPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
+    UnreliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
         super(process, mexId, oplink, operation, epr, myRoleEPR, channel);
     }
 
-    /**
-     * The criteria for issuing a replyXXX call on BLOCKING message exchanges is that the replyXXX must come while the
-     * engine is blocked in an  
-     * {@link MessageExchangeContext#invokePartnerBlocking(org.apache.ode.bpel.iapi.PartnerRoleMessageExchange)}. 
-     * method. 
-     */
+
+    @Override
+    public InvocationStyle getInvocationStyle() {
+        return InvocationStyle.UNRELIABLE;
+    }
+
+
+    @Override
+    protected void resumeInstance() {
+        assert !_contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
+        assert !_process.isInMemory() : "resumeInstance() for in-mem processes makes no sense.";
+
+        final WorkEvent we = generateInvokeResponseWorkEvent();
+        if (__log.isDebugEnabled()) {
+            __log.debug("resumeInstance: scheduling WorkEvent " + we);
+        }
+
+
+        doInTX(new InDbAction<Void>() {
+
+            public Void call(MessageExchangeDAO mexdao) {
+                save(mexdao);
+                _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+                return null;
+            }
+        });
+    }
+
     @Override
     protected void checkReplyContextOk() {
-        if (!_blocked)
+        super.checkReplyContextOk();
+
+        if (!_blocked && getStatus() != Status.ASYNC)
             throw new BpelEngineException("replyXXX operation attempted outside of BLOCKING region!");
+
+        // Prevent user from attempting the replyXXXX calls while a transaction is active. 
+        if (_contexts.isTransacted())
+            throw new BpelEngineException("Cannot reply to UNRELIABLE style invocation from a transactional context!");
+        
+
     }
 
+    
+    
     @Override
-    public InvocationStyle getInvocationStyle() {
-        return InvocationStyle.BLOCKING;
+    public void replyAsync(String foreignKey) {
+        if (__log.isDebugEnabled()) 
+            __log.debug("replyAsync mex=" + _mexId);
+
+        sync();
+        
+        if (!_blocked)
+            throw new BpelEngineException("Invalid context for replyAsync(); can only be called during MessageExchangeContext call. ");
+        
+        // TODO: shouldn't this set _blocked? 
+        
+        checkReplyContextOk();
+        setStatus(Status.ASYNC);
+        _foreignKey = foreignKey;
+        sync();
+
+    }
+
+
+    /**
+     * Method used by server to wait until a response is available. 
+     */
+    Status waitForResponse() {
+        // TODO: actually wait for response.
+        return getStatus();
     }