You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/06 22:47:59 UTC
svn commit: r563267 [2/2] - in
/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel: engine/
memdao/
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007
@@ -19,6 +19,11 @@
package org.apache.ode.bpel.engine;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import javax.wsdl.Operation;
import javax.xml.namespace.QName;
@@ -35,6 +40,8 @@
import org.apache.ode.bpel.o.OPartnerLink;
import org.w3c.dom.Element;
+import com.sun.corba.se.spi.activation._ActivatorImplBase;
+
/**
* Base-class implementation of the interface used to expose a partner invocation to the integration layer.
*
@@ -51,22 +58,33 @@
protected volatile String _foreignKey;
+ protected Lock _accessLock = new ReentrantLock();
+
+ protected Condition _stateChanged = _accessLock.newCondition();
+ protected Condition _acked = _accessLock.newCondition();
+
private QName _caller;
- /** thread-local indicator telling us if a given thread is the thread that "owns" the object. */
- final ThreadLocal<Boolean> _ownerThread = new ThreadLocal<Boolean>() {
- @Override
- protected Boolean initialValue() {
- return false;
- }
+ /** the states for a partner mex. */
+ enum State {
+ /** state when we're in one of the MexContext.invokeXXX methods. */
+ INVOKE_XXX,
+ /** hold all actions (blocks the IL) */
+ HOLD,
+
+ /** the MEX is ASYNC ("in the wild"), i.e. a response can come at any momemnt from any thread. */
+ ASYNC,
+
+ /** the MEX is dead, it should no longer be accessed by the IL */
+ DEAD
};
- volatile boolean _blocked = false;
+ protected State _state = State.INVOKE_XXX;
- PartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation,
+ PartnerRoleMessageExchangeImpl(BpelProcess process, Long iid, String mexId, OPartnerLink oplink, Operation operation,
EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
- super(process, mexId, oplink, oplink.partnerRolePortType, operation);
+ super(process, iid, mexId, oplink, oplink.partnerRolePortType, operation);
_myRoleEPR = myRoleEPR;
_partnerRoleChannel = channel;
}
@@ -78,49 +96,75 @@
}
@Override
- public void save(MessageExchangeDAO dao) {
+ void save(MessageExchangeDAO dao) {
super.save(dao);
}
+ @Override
+ void ack(AckType acktype) {
+ _accessLock.lock();
+ try {
+ super.ack(acktype);
+ _acked.signalAll();
+ } finally {
+ _accessLock.unlock();
+ }
+ }
+
public void replyAsync(String foreignKey) {
- throw new BpelEngineException("replyAsync() is not supported for invocation style " + getInvocationStyle());
+ throw new IllegalStateException("replyAsync() is not supported for invocation style " + getInvocationStyle());
}
public void replyOneWayOk() {
if (__log.isDebugEnabled()) {
__log.debug("replyOneWayOk mex=" + getMessageExchangeId());
}
- sync();
- checkReplyContextOk();
- setStatus(Status.ASYNC);
- sync();
+
+ _accessLock.lock();
+ try {
+ checkReplyContextOk();
+ ack(AckType.ONEWAY);
+ } finally {
+ _accessLock.unlock();
+ }
}
public void replyWithFault(QName faultType, Message outputFaultMessage) throws BpelEngineException {
if (__log.isDebugEnabled()) {
__log.debug("replyWithFault mex=" + getMessageExchangeId());
}
- sync();
- checkReplyContextOk();
- _fault = faultType;
- _response = (MessageImpl) outputFaultMessage;
- setStatus(Status.FAULT);
- sync();
- if (!_blocked)
- resumeInstance();
+
+ _accessLock.lock();
+ try {
+ checkReplyContextOk();
+ _fault = faultType;
+ _failureType = null;
+ _response = (MessageImpl) outputFaultMessage;
+ ack(AckType.FAULT);
+ if (_state == State.ASYNC)
+ asyncACK();
+ } finally {
+ _accessLock.unlock();
+ }
}
public void reply(Message response) throws BpelEngineException {
if (__log.isDebugEnabled()) {
__log.debug("reply mex=" + getMessageExchangeId());
}
- sync();
- checkReplyContextOk();
- _response = (MessageImpl) response;
- setStatus(Status.RESPONSE);
- sync();
- if (!_blocked)
- resumeInstance();
+
+ _accessLock.lock();
+ try {
+ checkReplyContextOk();
+ _response = (MessageImpl) response;
+ _fault = null;
+ _failureType = null;
+ ack(AckType.RESPONSE);
+ if (_state == State.ASYNC)
+ asyncACK();
+ } finally {
+ _accessLock.unlock();
+ }
}
@@ -128,14 +172,20 @@
if (__log.isDebugEnabled()) {
__log.debug("replyWithFailure mex=" + getMessageExchangeId());
}
- sync();
- checkReplyContextOk();
- _failureType = type;
- _explanation = description;
- setStatus(Status.FAILURE);
- sync();
- if (!_blocked)
- resumeInstance();
+
+ _accessLock.lock();
+ try {
+ checkReplyContextOk();
+ _failureType = type;
+ _explanation = description;
+ _fault = null;
+ _response = null;
+ ack(AckType.FAILURE);
+ if (_state == State.ASYNC)
+ asyncACK();
+ } finally {
+ _accessLock.unlock();
+ }
}
public QName getCaller() {
@@ -168,28 +218,47 @@
* for ASYNC and RELIABLE invocations.
*
*/
- protected void resumeInstance() {
- assert false : "should not get resumeInstance() call";
- throw new IllegalStateException("InternalError: unexpected state");
- }
+ protected abstract void asyncACK();
+
+
+ protected void checkReplyContextOk() {
+ // Prevent duplicate replies.
+ while (_state == State.HOLD)
+ try {
+ _stateChanged.await();
+ } catch (InterruptedException e) {
+ throw new BpelEngineException("Thread Interrupted.", e);
+ }
- protected WorkEvent generateInvokeResponseWorkEvent() {
- WorkEvent we = new WorkEvent();
- we.setProcessId(_process.getPID());
- we.setIID(_iid);
- we.setType(WorkEvent.Type.PARTNER_RESPONSE);
- we.setChannel(_responseChannel);
- we.setMexId(_mexId);
+ if (_state == State.DEAD)
+ throw new IllegalStateException("Object used in inappropriate context. ");
- return we;
+ if (getStatus() != MessageExchange.Status.REQ)
+ throw new IllegalStateException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + getStatus());
}
- protected void checkReplyContextOk() {
- // Prevent duplicate replies.
- if (getStatus() != MessageExchange.Status.REQUEST && getStatus() != MessageExchange.Status.ASYNC)
- throw new BpelEngineException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + getStatus());
-
+ void setState(State newstate) {
+ _accessLock.lock();
+ try {
+ _state = newstate;
+ _stateChanged.signalAll();
+ } finally {
+ _accessLock.unlock();
+ }
+ }
+
+ public boolean waitForAck(long timeout) throws InterruptedException {
+ _accessLock.lock();
+ try {
+ if (getStatus() != Status.ACK)
+ return _acked.await(timeout,TimeUnit.MILLISECONDS);
+ else
+ return true;
+ } finally {
+ _accessLock.unlock();
+ }
}
+
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007
@@ -19,33 +19,17 @@
package org.apache.ode.bpel.engine;
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.dao.MessageDAO;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
-import org.apache.ode.bpel.intercept.AbortMessageExchangeException;
-import org.apache.ode.bpel.intercept.FaultMessageExchangeException;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
-import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
-import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
import org.apache.ode.bpel.o.OPartnerLink;
-import javax.wsdl.Operation;
-import javax.xml.namespace.QName;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
/**
* Provides an implementation of the {@link MyRoleMessageExchange} inteface for interactions performed in the
* {@link InvocationStyle#RELIABLE} style.
@@ -68,7 +52,7 @@
assertTransaction();
// Cover the case where invoke was already called.
- if (getStatus() == Status.REQUEST)
+ if (getStatus() == Status.REQ)
return;
if (getStatus() != Status.NEW)
@@ -80,7 +64,9 @@
if (__log.isDebugEnabled())
__log.debug("invoke() EPR= " + _epr + " ==> " + _process);
- setStatus(Status.REQUEST);
+
+ request();
+
save(getDAO());
scheduleInvoke();
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007
@@ -3,56 +3,69 @@
import javax.wsdl.Operation;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.engine.MessageExchangeImpl.InDbAction;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.o.OPartnerLink;
public class ReliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
- public ReliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation op, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel partnerRoleChannel) {
- super(process, mexId, oplink, op, epr, myRoleEPR, partnerRoleChannel);
+ public ReliablePartnerRoleMessageExchangeImpl(BpelProcess process, long iid, String mexId, OPartnerLink oplink, Operation op,
+ EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel partnerRoleChannel) {
+ super(process, iid, mexId, oplink, op, epr, myRoleEPR, partnerRoleChannel);
}
-
@Override
protected void checkReplyContextOk() {
super.checkReplyContextOk();
-
+
if (!_contexts.isTransacted())
throw new BpelEngineException("Cannot replyXXX from non-transaction context!");
}
-
@Override
public void replyAsync(String foreignKey) {
- if (!_blocked)
- throw new BpelEngineException("Invalid context for replyAsync(); can only be called during MessageExchangeContext call. ");
- checkReplyContextOk();
- setStatus(Status.ASYNC);
- _foreignKey = foreignKey;
+ _accessLock.lock();
+ try {
+ checkReplyContextOk();
+
+ if (_state != State.INVOKE_XXX)
+ throw new IllegalStateException(
+ "Invalid context for replyAsync(); can only be called during MessageExchangeContext call. ");
+
+ _foreignKey = foreignKey;
+ } finally {
+ _accessLock.unlock();
+ }
}
-
@Override
- protected void resumeInstance() {
+ protected void asyncACK() {
// TODO Auto-generated method stub
assert _contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
assert !_process.isInMemory() : "resumeInstance() for reliable in-mem processes makes no sense.";
- final WorkEvent we = generateInvokeResponseWorkEvent();
-
- save(getDAO());
+ MessageExchangeDAO mexdao = getDAO();
+ final WorkEvent we = generatePartnerResponseWorkEvent(mexdao);
+ save(mexdao);
_contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
}
+
@Override
public InvocationStyle getInvocationStyle() {
return InvocationStyle.RELIABLE;
}
-
-
+
+ private WorkEvent generatePartnerResponseWorkEvent(MessageExchangeDAO mexdao) {
+ WorkEvent we = new WorkEvent();
+ we.setProcessId(_process.getPID());
+ we.setChannel(mexdao.getChannel());
+ we.setIID(_iid);
+ we.setMexId(_mexId);
+ we.setType(WorkEvent.Type.PARTNER_RESPONSE);
+ return we;
+ }
+
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007
@@ -28,7 +28,7 @@
assertTransaction();
_process.invokeProcess(getDAO());
- if (MessageExchange.Status.valueOf(getDAO().getStatus()) != Status.RESPONSE)
+ if (MessageExchange.Status.valueOf(getDAO().getStatus()) != Status.ACK)
throw new BpelEngineException("Transactional invoke on process did not yield a response.");
return Status.valueOf(getDAO().getStatus());
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007
@@ -18,8 +18,8 @@
*/
public class TransactedPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
- TransactedPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation,EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
- super(process, mexId, oplink, operation, epr, myRoleEPR, channel);
+ TransactedPartnerRoleMessageExchangeImpl(BpelProcess process, long iid, String mexId, OPartnerLink oplink,Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
+ super(process, iid, mexId, oplink, operation, epr, myRoleEPR, channel);
}
@@ -31,10 +31,8 @@
*/
@Override
protected void checkReplyContextOk() {
- if (!_blocked)
- throw new BpelEngineException("replyXXX operation attempted outside of BLOCKING region!");
- if (!_ownerThread.get())
- throw new BpelEngineException("replyXXX operation attempted from foreign thread!");
+ if (_state != State.INVOKE_XXX)
+ throw new BpelEngineException("replyXXX operation attempted outside of transacted region!");
assert _contexts.isTransacted() : "Internal Error: owner thread must be transactional!?!?!!?";
}
@@ -43,6 +41,13 @@
@Override
public InvocationStyle getInvocationStyle() {
return InvocationStyle.TRANSACTED;
+ }
+
+
+ @Override
+ protected void asyncACK() {
+ throw new IllegalStateException("Async responses not supported for transaction invocations.");
+
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007
@@ -35,17 +35,11 @@
/**
* Override the setStatus(...) to notify our future when there is a response/failure.
*/
- protected void setStatus(Status status) {
+ protected void ack(AckType acktype) {
Status old = getStatus();
- super.setStatus(status);
+ super.ack(acktype);
if (_future != null) {
- if (getMessageExchangePattern() == MessageExchangePattern.REQUEST_ONLY) {
- if (old == Status.REQUEST && old != status)
- _future.done(status);
- } else /* two-way */ {
- if ((old == Status.ASYNC || old == Status.REQUEST) && status != Status.ASYNC)
- _future.done(status);
- }
+ _future.done(Status.ACK);
}
}
@@ -60,7 +54,7 @@
_process.enqueueTransaction(new Callable<Void>() {
public Void call() throws Exception {
- UnreliableMyRoleMessageExchangeImpl.super.setStatus(Status.REQUEST);
+ request();
MessageExchangeDAO dao = _process.createMessageExchange(getMessageExchangeId(), MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
save(dao);
if (_process.isInMemory())
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007
@@ -5,109 +5,87 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
-import org.apache.ode.bpel.engine.MessageExchangeImpl.InDbAction;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.InvocationStyle;
-import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.o.OPartnerLink;
/**
- * Implementation of the {@link PartnerRoleMessageExchange} interface that is passed to the IL when the
- * UNRELIABLE invocation style is used (see {@link InvocationStyle#UNRELIABLE}). The basic idea here is
- * that with this style, the IL performs the operation outside of a transactional context. It can either
- * finish it right away (BLOCK) or indicate that the response will be provided later (replyASYNC).
- *
- *
- * TODO: serious synchronization issues in this class.
- *
+ * Implementation of the {@link PartnerRoleMessageExchange} interface that is passed to the IL when the UNRELIABLE invocation style
+ * is used (see {@link InvocationStyle#UNRELIABLE}). The basic idea here is that with this style, the IL performs the operation
+ * outside of a transactional context. It can either finish it right away (BLOCK) or indicate that the response will be provided
+ * later (replyASYNC).
+ *
+ *
+ *
* @author Maciej Szefler <mszefler at gmail dot com>
- *
+ *
*/
public class UnreliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
private static final Log __log = LogFactory.getLog(UnreliablePartnerRoleMessageExchangeImpl.class);
-
+ boolean _asyncReply;
- UnreliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
- super(process, mexId, oplink, operation, epr, myRoleEPR, channel);
+ UnreliablePartnerRoleMessageExchangeImpl(BpelProcess process, long iid, String mexId, OPartnerLink oplink, Operation operation,
+ EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
+ super(process, iid, mexId, oplink, operation, epr, myRoleEPR, channel);
}
-
@Override
public InvocationStyle getInvocationStyle() {
return InvocationStyle.UNRELIABLE;
}
-
@Override
- protected void resumeInstance() {
+ protected void asyncACK() {
assert !_contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
assert !_process.isInMemory() : "resumeInstance() for in-mem processes makes no sense.";
- final WorkEvent we = generateInvokeResponseWorkEvent();
if (__log.isDebugEnabled()) {
- __log.debug("resumeInstance: scheduling WorkEvent " + we);
+ __log.debug("asyncResponseReceived: for IID " + getIID() );
}
-
- doInTX(new InDbAction<Void>() {
-
- public Void call(MessageExchangeDAO mexdao) {
- save(mexdao);
- _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
- return null;
+ _process.scheduleInstanceWork(getIID(), _process._server.new TransactedRunnable(new Runnable() {
+ public void run() {
+ MessageExchangeDAO dao = getDAO();
+ save(dao);
+ _process.executeContinueInstancePartnerRoleResponseReceived(dao);
}
- });
+
+ }));
}
@Override
protected void checkReplyContextOk() {
super.checkReplyContextOk();
- if (!_blocked && getStatus() != Status.ASYNC)
- throw new BpelEngineException("replyXXX operation attempted outside of BLOCKING region!");
-
- // Prevent user from attempting the replyXXXX calls while a transaction is active.
+ // Prevent user from attempting the replyXXXX calls while a transaction is active.
if (_contexts.isTransacted())
throw new BpelEngineException("Cannot reply to UNRELIABLE style invocation from a transactional context!");
-
}
-
-
@Override
public void replyAsync(String foreignKey) {
- if (__log.isDebugEnabled())
+ if (__log.isDebugEnabled())
__log.debug("replyAsync mex=" + _mexId);
- sync();
-
- if (!_blocked)
- throw new BpelEngineException("Invalid context for replyAsync(); can only be called during MessageExchangeContext call. ");
-
- // TODO: shouldn't this set _blocked?
-
- checkReplyContextOk();
- setStatus(Status.ASYNC);
- _foreignKey = foreignKey;
- sync();
-
- }
-
-
- /**
- * Method used by server to wait until a response is available.
- */
- Status waitForResponse() {
- // TODO: actually wait for response.
- return getStatus();
+ _accessLock.lock();
+ try {
+ checkReplyContextOk();
+
+ if (_state != State.INVOKE_XXX)
+ throw new IllegalStateException(
+ "Invalid context for replyAsync(); can only be called during MessageExchangeContext call. ");
+
+ _asyncReply = true;
+ _foreignKey = foreignKey;
+ } finally {
+ _accessLock.unlock();
+ }
}
-
+
}
-
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java Mon Aug 6 13:47:58 2007
@@ -32,6 +32,7 @@
import org.apache.ode.bpel.dao.PartnerLinkDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
import org.w3c.dom.Element;
public class MessageExchangeDAOImpl extends DaoBaseImpl implements MessageExchangeDAO {
@@ -61,6 +62,7 @@
private MessageExchangeDAO _pipedExchange;
private String _failureType;
private long _timeout;
+ private AckType _ackType;
public MessageExchangeDAOImpl(char direction, String messageEchangeId){
this.direction = direction;
@@ -299,5 +301,13 @@
public void setTimeout(long timeout) {
_timeout = timeout;
+ }
+
+ public AckType getAckType() {
+ return _ackType;
+ }
+
+ public void setAckType(AckType ackType) {
+ _ackType = ackType;
}
}