You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/06 22:47:59 UTC

svn commit: r563267 [2/2] - in /ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel: engine/ memdao/

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java Mon Aug  6 13:47:58 2007
@@ -19,6 +19,11 @@
 
 package org.apache.ode.bpel.engine;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import javax.wsdl.Operation;
 import javax.xml.namespace.QName;
 
@@ -35,6 +40,8 @@
 import org.apache.ode.bpel.o.OPartnerLink;
 import org.w3c.dom.Element;
 
+import com.sun.corba.se.spi.activation._ActivatorImplBase;
+
 /**
  * Base-class implementation of the interface used to expose a partner invocation to the integration layer.
  * 
@@ -51,22 +58,33 @@
 
     protected volatile String _foreignKey;
 
+    protected Lock _accessLock = new ReentrantLock();
+
+    protected Condition _stateChanged = _accessLock.newCondition();
+    protected Condition _acked = _accessLock.newCondition();
+
     private QName _caller;
 
-    /** thread-local indicator telling us if a given thread is the thread that "owns" the object. */
-    final ThreadLocal<Boolean> _ownerThread = new ThreadLocal<Boolean>() {
-        @Override
-        protected Boolean initialValue() {
-            return false;
-        }
+    /** the states for a partner mex. */
+    enum State {
+        /** state when we're in one of the MexContext.invokeXXX methods. */
+        INVOKE_XXX,
 
+        /** hold all actions (blocks the IL) */
+        HOLD,
+
+        /** the MEX is ASYNC ("in the wild"), i.e. a response can come at any momemnt from any thread. */
+        ASYNC,
+
+        /** the MEX is dead, it should no longer be accessed by the IL */
+        DEAD
     };
 
-    volatile boolean _blocked = false;
+    protected State _state = State.INVOKE_XXX;
 
-    PartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation,
+    PartnerRoleMessageExchangeImpl(BpelProcess process, Long iid, String mexId, OPartnerLink oplink, Operation operation,
             EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
-        super(process, mexId, oplink, oplink.partnerRolePortType, operation);
+        super(process, iid, mexId, oplink, oplink.partnerRolePortType, operation);
         _myRoleEPR = myRoleEPR;
         _partnerRoleChannel = channel;
     }
@@ -78,49 +96,75 @@
     }
 
     @Override
-    public void save(MessageExchangeDAO dao) {
+    void save(MessageExchangeDAO dao) {
         super.save(dao);
     }
 
+    @Override
+    void ack(AckType acktype) {
+        _accessLock.lock();
+        try {
+            super.ack(acktype);
+            _acked.signalAll();
+        } finally {
+            _accessLock.unlock();
+        }
+    }
+    
     public void replyAsync(String foreignKey) {
-        throw new BpelEngineException("replyAsync() is not supported for invocation style " + getInvocationStyle());
+        throw new IllegalStateException("replyAsync() is not supported for invocation style " + getInvocationStyle());
     }
 
     public void replyOneWayOk() {
         if (__log.isDebugEnabled()) {
             __log.debug("replyOneWayOk mex=" + getMessageExchangeId());
         }
-        sync();
-        checkReplyContextOk();
-        setStatus(Status.ASYNC);
-        sync();
+
+        _accessLock.lock();
+        try {
+            checkReplyContextOk();
+            ack(AckType.ONEWAY);
+        } finally {
+            _accessLock.unlock();
+        }
     }
 
     public void replyWithFault(QName faultType, Message outputFaultMessage) throws BpelEngineException {
         if (__log.isDebugEnabled()) {
             __log.debug("replyWithFault mex=" + getMessageExchangeId());
         }
-        sync();
-        checkReplyContextOk();
-        _fault = faultType;
-        _response = (MessageImpl) outputFaultMessage;
-        setStatus(Status.FAULT);
-        sync();
-        if (!_blocked)
-            resumeInstance();
+
+        _accessLock.lock();
+        try {
+            checkReplyContextOk();
+            _fault = faultType;
+            _failureType = null;
+            _response = (MessageImpl) outputFaultMessage;
+            ack(AckType.FAULT);            
+            if (_state == State.ASYNC)
+                asyncACK();
+        } finally {
+            _accessLock.unlock();
+        }
     }
 
     public void reply(Message response) throws BpelEngineException {
         if (__log.isDebugEnabled()) {
             __log.debug("reply mex=" + getMessageExchangeId());
         }
-        sync();
-        checkReplyContextOk();
-        _response = (MessageImpl) response;
-        setStatus(Status.RESPONSE);
-        sync();
-        if (!_blocked)
-            resumeInstance();
+
+        _accessLock.lock();
+        try {
+            checkReplyContextOk();
+            _response = (MessageImpl) response;
+            _fault = null;
+            _failureType = null;
+            ack(AckType.RESPONSE);
+            if (_state == State.ASYNC)
+                asyncACK();
+        } finally {
+            _accessLock.unlock();
+        }
 
     }
 
@@ -128,14 +172,20 @@
         if (__log.isDebugEnabled()) {
             __log.debug("replyWithFailure mex=" + getMessageExchangeId());
         }
-        sync();
-        checkReplyContextOk();
-        _failureType = type;
-        _explanation = description;
-        setStatus(Status.FAILURE);
-        sync();
-        if (!_blocked)
-            resumeInstance();
+
+        _accessLock.lock();
+        try {
+            checkReplyContextOk();
+            _failureType = type;
+            _explanation = description;
+            _fault = null;
+            _response = null;
+            ack(AckType.FAILURE);
+            if (_state == State.ASYNC)
+                asyncACK();
+        } finally {
+            _accessLock.unlock();
+        }
     }
 
     public QName getCaller() {
@@ -168,28 +218,47 @@
      * for ASYNC and RELIABLE invocations.
      * 
      */
-    protected void resumeInstance() {
-        assert false : "should not get resumeInstance() call";
-        throw new IllegalStateException("InternalError: unexpected state");
-    }
+    protected abstract void asyncACK();
+    
+    
+    protected void checkReplyContextOk() {
+        // Prevent duplicate replies.
+        while (_state == State.HOLD)
+            try {
+                _stateChanged.await();
+            } catch (InterruptedException e) {
+                throw new BpelEngineException("Thread Interrupted.", e);
+            }
 
-    protected WorkEvent generateInvokeResponseWorkEvent() {
-        WorkEvent we = new WorkEvent();
-        we.setProcessId(_process.getPID());
-        we.setIID(_iid);
-        we.setType(WorkEvent.Type.PARTNER_RESPONSE);
-        we.setChannel(_responseChannel);
-        we.setMexId(_mexId);
+        if (_state == State.DEAD)
+            throw new IllegalStateException("Object used in inappropriate context. ");
 
-        return we;
+        if (getStatus() != MessageExchange.Status.REQ)
+            throw new IllegalStateException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + getStatus());
 
     }
 
-    protected void checkReplyContextOk() {
-        // Prevent duplicate replies.
-        if (getStatus() != MessageExchange.Status.REQUEST && getStatus() != MessageExchange.Status.ASYNC)
-            throw new BpelEngineException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + getStatus());
-
+    void setState(State newstate) {
+        _accessLock.lock();
+        try {
+            _state = newstate;
+            _stateChanged.signalAll();
+        } finally {
+            _accessLock.unlock();
+        }
+    }
+    
+    public boolean waitForAck(long timeout) throws InterruptedException  {
+        _accessLock.lock();
+        try {
+            if (getStatus() != Status.ACK) 
+                return _acked.await(timeout,TimeUnit.MILLISECONDS);
+            else
+                return true;
+        } finally {
+            _accessLock.unlock();
+        }
     }
 
+    
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java Mon Aug  6 13:47:58 2007
@@ -19,33 +19,17 @@
 
 package org.apache.ode.bpel.engine;
 
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.dao.MessageDAO;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
-import org.apache.ode.bpel.intercept.AbortMessageExchangeException;
-import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
-import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
-import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
 import org.apache.ode.bpel.o.OPartnerLink;
 
-import javax.wsdl.Operation;
-import javax.xml.namespace.QName;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 /**
  * Provides an implementation of the {@link MyRoleMessageExchange} inteface for interactions performed in the
  * {@link InvocationStyle#RELIABLE} style.
@@ -68,7 +52,7 @@
         assertTransaction();
 
         // Cover the case where invoke was already called. 
-        if (getStatus() == Status.REQUEST)
+        if (getStatus() == Status.REQ)
             return;
         
         if (getStatus() != Status.NEW)
@@ -80,7 +64,9 @@
         
         if (__log.isDebugEnabled())
             __log.debug("invoke() EPR= " + _epr + " ==> " + _process);
-        setStatus(Status.REQUEST);
+        
+        request();
+        
         save(getDAO());
         scheduleInvoke();
     }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java Mon Aug  6 13:47:58 2007
@@ -3,56 +3,69 @@
 import javax.wsdl.Operation;
 
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.engine.MessageExchangeImpl.InDbAction;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.o.OPartnerLink;
 
 public class ReliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
 
-    public ReliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation op, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel partnerRoleChannel) {
-        super(process, mexId, oplink, op, epr, myRoleEPR, partnerRoleChannel);
+    public ReliablePartnerRoleMessageExchangeImpl(BpelProcess process, long iid, String mexId, OPartnerLink oplink, Operation op,
+            EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel partnerRoleChannel) {
+        super(process, iid, mexId, oplink, op, epr, myRoleEPR, partnerRoleChannel);
     }
 
-    
     @Override
     protected void checkReplyContextOk() {
         super.checkReplyContextOk();
-        
+
         if (!_contexts.isTransacted())
             throw new BpelEngineException("Cannot replyXXX from non-transaction context!");
     }
 
-
     @Override
     public void replyAsync(String foreignKey) {
-        if (!_blocked)
-            throw new BpelEngineException("Invalid context for replyAsync(); can only be called during MessageExchangeContext call. ");
-        checkReplyContextOk();
-        setStatus(Status.ASYNC);
-        _foreignKey = foreignKey;
+        _accessLock.lock();
+        try {
+            checkReplyContextOk();
+
+            if (_state != State.INVOKE_XXX)
+                throw new IllegalStateException(
+                    "Invalid context for replyAsync(); can only be called during MessageExchangeContext call. ");
+            
+            _foreignKey = foreignKey;
+        } finally {
+            _accessLock.unlock();
+        }
     }
 
-
     @Override
-    protected void resumeInstance() {
+    protected void asyncACK() {
         // TODO Auto-generated method stub
         assert _contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
         assert !_process.isInMemory() : "resumeInstance() for reliable in-mem processes makes no sense.";
 
-        final WorkEvent we = generateInvokeResponseWorkEvent();
-
-        save(getDAO());
+        MessageExchangeDAO mexdao = getDAO();
+        final WorkEvent we = generatePartnerResponseWorkEvent(mexdao);
+        save(mexdao);
         _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
     }
 
+
     @Override
     public InvocationStyle getInvocationStyle() {
         return InvocationStyle.RELIABLE;
     }
-    
-    
+
+    private WorkEvent generatePartnerResponseWorkEvent(MessageExchangeDAO mexdao) {
+        WorkEvent we = new WorkEvent();
+        we.setProcessId(_process.getPID());
+        we.setChannel(mexdao.getChannel());
+        we.setIID(_iid);
+        we.setMexId(_mexId);
+        we.setType(WorkEvent.Type.PARTNER_RESPONSE);
+        return we;
+    }
+
 }

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java Mon Aug  6 13:47:58 2007
@@ -28,7 +28,7 @@
         assertTransaction();
        
         _process.invokeProcess(getDAO());
-        if (MessageExchange.Status.valueOf(getDAO().getStatus()) != Status.RESPONSE)
+        if (MessageExchange.Status.valueOf(getDAO().getStatus()) != Status.ACK)
             throw new BpelEngineException("Transactional invoke on process did not yield a response.");
         return Status.valueOf(getDAO().getStatus());
         

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java Mon Aug  6 13:47:58 2007
@@ -18,8 +18,8 @@
  */
 public class TransactedPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
 
-    TransactedPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation,EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
-        super(process, mexId, oplink, operation,  epr, myRoleEPR, channel);
+    TransactedPartnerRoleMessageExchangeImpl(BpelProcess process, long iid, String mexId, OPartnerLink oplink,Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
+        super(process, iid, mexId, oplink,  operation, epr, myRoleEPR, channel);
     }
     
     
@@ -31,10 +31,8 @@
      */
     @Override
     protected void checkReplyContextOk() {
-        if (!_blocked)
-            throw new BpelEngineException("replyXXX operation attempted outside of BLOCKING region!");
-        if (!_ownerThread.get())
-            throw new BpelEngineException("replyXXX operation attempted from foreign thread!");
+        if (_state != State.INVOKE_XXX)
+            throw new BpelEngineException("replyXXX operation attempted outside of transacted region!");
         
         assert _contexts.isTransacted() : "Internal Error: owner thread must be transactional!?!?!!?"; 
     }
@@ -43,6 +41,13 @@
     @Override
     public InvocationStyle getInvocationStyle() {
         return InvocationStyle.TRANSACTED;
+    }
+
+
+    @Override
+    protected void asyncACK() {
+        throw new IllegalStateException("Async responses not supported for transaction invocations.");
+        
     }
 
 

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java Mon Aug  6 13:47:58 2007
@@ -35,17 +35,11 @@
     /**
      * Override the setStatus(...) to notify our future when there is a response/failure.
      */
-    protected void setStatus(Status status) {
+    protected void ack(AckType acktype) {
         Status old = getStatus();
-        super.setStatus(status);
+        super.ack(acktype);
         if (_future != null) {
-            if (getMessageExchangePattern() == MessageExchangePattern.REQUEST_ONLY) {
-                if (old == Status.REQUEST && old != status)
-                    _future.done(status);
-            } else /* two-way */ {
-                if ((old == Status.ASYNC || old == Status.REQUEST) && status != Status.ASYNC)
-                    _future.done(status);
-            }
+            _future.done(Status.ACK);
         }
     }
 
@@ -60,7 +54,7 @@
         _process.enqueueTransaction(new Callable<Void>() {
 
             public Void call() throws Exception {
-                UnreliableMyRoleMessageExchangeImpl.super.setStatus(Status.REQUEST);
+                request();
                 MessageExchangeDAO dao = _process.createMessageExchange(getMessageExchangeId(), MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
                 save(dao);
                 if (_process.isInMemory()) 

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java Mon Aug  6 13:47:58 2007
@@ -5,109 +5,87 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.engine.MessageExchangeImpl.InDbAction;
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.o.OPartnerLink;
 
 /**
- * Implementation of the {@link PartnerRoleMessageExchange} interface that is passed to the IL when the 
- * UNRELIABLE invocation style is used (see {@link InvocationStyle#UNRELIABLE}). The basic idea here is 
- * that with this style, the IL performs the operation outside of a transactional context. It can either
- * finish it right away (BLOCK) or indicate that the response will be provided later (replyASYNC).  
- *
- *  
- *  TODO: serious synchronization issues in this class.
- *  
+ * Implementation of the {@link PartnerRoleMessageExchange} interface that is passed to the IL when the UNRELIABLE invocation style
+ * is used (see {@link InvocationStyle#UNRELIABLE}). The basic idea here is that with this style, the IL performs the operation
+ * outside of a transactional context. It can either finish it right away (BLOCK) or indicate that the response will be provided
+ * later (replyASYNC).
+ * 
+ * 
+ * 
  * @author Maciej Szefler <mszefler at gmail dot com>
- *
+ * 
  */
 public class UnreliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
     private static final Log __log = LogFactory.getLog(UnreliablePartnerRoleMessageExchangeImpl.class);
-    
+    boolean _asyncReply;
 
-    UnreliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
-        super(process, mexId, oplink, operation, epr, myRoleEPR, channel);
+    UnreliablePartnerRoleMessageExchangeImpl(BpelProcess process, long iid, String mexId, OPartnerLink oplink, Operation operation,
+            EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
+        super(process, iid, mexId, oplink, operation, epr, myRoleEPR, channel);
     }
 
-
     @Override
     public InvocationStyle getInvocationStyle() {
         return InvocationStyle.UNRELIABLE;
     }
 
-
     @Override
-    protected void resumeInstance() {
+    protected void asyncACK() {
         assert !_contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
         assert !_process.isInMemory() : "resumeInstance() for in-mem processes makes no sense.";
 
-        final WorkEvent we = generateInvokeResponseWorkEvent();
         if (__log.isDebugEnabled()) {
-            __log.debug("resumeInstance: scheduling WorkEvent " + we);
+            __log.debug("asyncResponseReceived: for IID " + getIID() );
         }
 
-
-        doInTX(new InDbAction<Void>() {
-
-            public Void call(MessageExchangeDAO mexdao) {
-                save(mexdao);
-                _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
-                return null;
+        _process.scheduleInstanceWork(getIID(), _process._server.new TransactedRunnable(new Runnable() {
+            public void run() {
+                MessageExchangeDAO dao = getDAO();
+                save(dao);
+                _process.executeContinueInstancePartnerRoleResponseReceived(dao);
             }
-        });
+
+        }));
     }
 
     @Override
     protected void checkReplyContextOk() {
         super.checkReplyContextOk();
 
-        if (!_blocked && getStatus() != Status.ASYNC)
-            throw new BpelEngineException("replyXXX operation attempted outside of BLOCKING region!");
-
-        // Prevent user from attempting the replyXXXX calls while a transaction is active. 
+        // Prevent user from attempting the replyXXXX calls while a transaction is active.
         if (_contexts.isTransacted())
             throw new BpelEngineException("Cannot reply to UNRELIABLE style invocation from a transactional context!");
-        
 
     }
 
-    
-    
     @Override
     public void replyAsync(String foreignKey) {
-        if (__log.isDebugEnabled()) 
+        if (__log.isDebugEnabled())
             __log.debug("replyAsync mex=" + _mexId);
 
-        sync();
-        
-        if (!_blocked)
-            throw new BpelEngineException("Invalid context for replyAsync(); can only be called during MessageExchangeContext call. ");
-        
-        // TODO: shouldn't this set _blocked? 
-        
-        checkReplyContextOk();
-        setStatus(Status.ASYNC);
-        _foreignKey = foreignKey;
-        sync();
-
-    }
-
-
-    /**
-     * Method used by server to wait until a response is available. 
-     */
-    Status waitForResponse() {
-        // TODO: actually wait for response.
-        return getStatus();
+        _accessLock.lock();
+        try {
+            checkReplyContextOk();
+
+            if (_state != State.INVOKE_XXX)
+                throw new IllegalStateException(
+                        "Invalid context for replyAsync(); can only be called during MessageExchangeContext call. ");
+
+            _asyncReply = true;
+            _foreignKey = foreignKey;
+        } finally {
+            _accessLock.unlock();
+        }
     }
 
-
     
+ 
 }
-

Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java Mon Aug  6 13:47:58 2007
@@ -32,6 +32,7 @@
 import org.apache.ode.bpel.dao.PartnerLinkDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
 import org.w3c.dom.Element;
 
 public class MessageExchangeDAOImpl extends DaoBaseImpl implements MessageExchangeDAO {
@@ -61,6 +62,7 @@
     private MessageExchangeDAO _pipedExchange;
     private String _failureType;
     private long _timeout;
+    private AckType _ackType;
 
 	public MessageExchangeDAOImpl(char direction, String messageEchangeId){
 		this.direction = direction;
@@ -299,5 +301,13 @@
 
     public void setTimeout(long timeout) {
         _timeout = timeout;
+    }
+
+    public AckType getAckType() {
+        return _ackType;
+    }
+
+    public void setAckType(AckType ackType) {
+        _ackType = ackType;
     }
 }