You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/03 22:07:34 UTC
svn commit: r562569 -
/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/
Author: mszefler
Date: Fri Aug 3 13:07:33 2007
New Revision: 562569
URL: http://svn.apache.org/viewvc?view=rev&rev=562569
Log:
removed the distinction between ASYNC/BLOCKING invocation style.
Added:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
- copied, changed from r562188, ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
- copied, changed from r561873, ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
Removed:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncPartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Fri Aug 3 13:07:33 2007
@@ -33,7 +33,6 @@
import java.util.concurrent.Future;
import javax.wsdl.Operation;
-import javax.wsdl.PortType;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
@@ -55,7 +54,6 @@
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
@@ -63,9 +61,7 @@
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
-import org.apache.ode.bpel.o.OElementVarType;
import org.apache.ode.bpel.o.OExpressionLanguage;
-import org.apache.ode.bpel.o.OMessageVarType;
import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.bpel.o.Serializer;
@@ -155,10 +151,12 @@
// TODO : do this on a per-partnerlink basis, support transacted styles.
HashSet<InvocationStyle> istyles = new HashSet<InvocationStyle>();
- istyles.add(InvocationStyle.BLOCKING);
+ istyles.add(InvocationStyle.UNRELIABLE);
+
if (!conf.isTransient()) {
- istyles.add(InvocationStyle.ASYNC);
istyles.add(InvocationStyle.RELIABLE);
+ } else {
+ istyles.add(InvocationStyle.TRANSACTED);
}
_invocationStyles = Collections.unmodifiableSet(istyles);
@@ -704,14 +702,11 @@
case RELIABLE:
mex = new ReliableMyRoleMessageExchangeImpl(this, mexId, oplink, operation, target);
break;
- case ASYNC:
- mex = new AsyncMyRoleMessageExchangeImpl(this, mexId, oplink, operation, target);
- break;
case TRANSACTED:
mex = new TransactedMyRoleMessageExchangeImpl(this, mexId, oplink, operation, target);
break;
- case BLOCKING:
- mex = new BlockingMyRoleMessageExchangeImpl(this, mexId, oplink, operation, target);
+ case UNRELIABLE:
+ mex = new UnreliableMyRoleMessageExchangeImpl(this, mexId, oplink, operation, target);
break;
default:
throw new AssertionError("Unexpected invocation style: " + istyle);
@@ -768,15 +763,10 @@
OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
switch (istyle) {
- case BLOCKING:
- mex = new BlockingPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
+ case UNRELIABLE:
+ mex = new UnreliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
break;
- case ASYNC:
- mex = new AsyncPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
- break;
-
case TRANSACTED:
mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /*
* EPR
@@ -1111,6 +1101,8 @@
}
void fireMexStateEvent(MessageExchangeDAO mexdao, Status old, Status news) {
+ // TODO: force a myrole mex to be created if it is not in cache.
+
if (old != news)
for (WeakReference<MyRoleMessageExchangeImpl> wr : _mexStateListeners) {
MyRoleMessageExchangeImpl mymex = wr.get();
@@ -1118,6 +1110,5 @@
mymex.onStateChanged(mexdao, old, news);
}
- // TODO: need to call MessageExchangeContext#onMyRoleMessageExchangeStateChanged
}
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Fri Aug 3 13:07:33 2007
@@ -27,6 +27,9 @@
import java.util.List;
import java.util.Set;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
import javax.wsdl.Operation;
import javax.wsdl.PortType;
import javax.xml.namespace.QName;
@@ -89,6 +92,7 @@
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.Namespaces;
import org.apache.ode.utils.ObjectPrinter;
+import org.omg.CORBA._PolicyStub;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -125,7 +129,7 @@
private List<MyRoleMessageExchangeImpl> _pendingMyRoleReplies = new LinkedList<MyRoleMessageExchangeImpl>();
private BpelInstanceWorker _instanceWorker;
-
+
private BpelProcess _bpelProcess;
/** Five second maximum for continous execution. */
@@ -135,9 +139,7 @@
private boolean _executed;
- public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker,
- ProcessInstanceDAO dao,
- PROCESS PROCESS,
+ public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
MessageExchangeDAO instantiatingMessageExchange) {
_instanceWorker = instanceWorker;
_bpelProcess = instanceWorker._process;
@@ -174,6 +176,10 @@
}
}
+ public String toString() {
+ return "{BpelRuntimeCtx PID=" + _bpelProcess.getPID() + ", IID=" + _iid + "}";
+ }
+
public Long getPid() {
return _iid;
}
@@ -537,37 +543,37 @@
Status previousStatus = Status.valueOf(myrolemex.getStatus());
myrolemex.setStatus(status.toString());
- _bpelProcess.fireMexStateEvent(myrolemex, previousStatus, status);
- if (myrolemex.getPipedMessageExchange() != null) /* p2p case */{
- MessageExchangeDAO pmex = myrolemex.getPipedMessageExchange();
+ doMyRoleResponse(myrolemex, previousStatus, status);
- if (BpelProcess.__log.isDebugEnabled()) {
- __log.debug("Replying to a p2p mex, myrole " + myrolemex + " - partnerole " + pmex);
- }
-
- // In the p2p case we copy the status/response from one mex object into the other.
- pmex.setResponse(myrolemex.getResponse());
- pmex.setStatus(myrolemex.getStatus());
- continuePartnerReplied(pmex);
- myrolemex.release();
+ sendEvent(evt);
+ }
+ private void doMyRoleResponse(MessageExchangeDAO myrolemex, Status previousStatus, Status newStatus) {
+ myrolemex.setStatus(newStatus.toString());
+ if (myrolemex.getPipedMessageExchange() != null) /* p2p case */{
+ p2pResponse(myrolemex);
} else /* IL-mediated communication */{
- InvocationStyle istyle = InvocationStyle.valueOf(myrolemex.getInvocationStyle());
- switch (istyle) {
- case RELIABLE:
- scheduleReliableResponse(myrolemex);
- break;
- case ASYNC:
- scheduleAsyncResponse(myrolemex);
- break;
- default:
- break;
- }
+ _bpelProcess.fireMexStateEvent(myrolemex, previousStatus, newStatus);
+ }
+ }
+ /**
+ * Handle P2P responses.
+ * @param myrolemex
+ */
+ private void p2pResponse(MessageExchangeDAO myrolemex) {
+ MessageExchangeDAO pmex = myrolemex.getPipedMessageExchange();
+
+ if (BpelProcess.__log.isDebugEnabled()) {
+ __log.debug("Replying to a p2p mex, myrole " + myrolemex + " - partnerole " + pmex);
}
- sendEvent(evt);
+ // In the p2p case we copy the status/response from one mex object into the other.
+ pmex.setResponse(myrolemex.getResponse());
+ pmex.setStatus(myrolemex.getStatus());
+ continuePartnerReplied(pmex);
+ myrolemex.release();
}
/**
@@ -648,16 +654,26 @@
}
public void registerTimer(TimerResponseChannel timerChannel, Date timeToFire) {
+
+ if (_bpelProcess.isInMemory())
+ throw new InvalidProcessException("Process not compatible with in-memory execution.");
+
WorkEvent we = new WorkEvent();
we.setIID(_dao.getInstanceId());
+ we.setProcessId(_bpelProcess.getPID());
we.setChannel(timerChannel.export());
we.setType(WorkEvent.Type.TIMER);
_contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
}
private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKey key) {
+
+ if (_bpelProcess.isInMemory())
+ throw new InvalidProcessException("Process not compatible with in-memory execution.");
+
WorkEvent we = new WorkEvent();
we.setIID(_dao.getInstanceId());
+ we.setProcessId(_bpelProcess.getPID());
we.setType(WorkEvent.Type.MATCHER);
we.setCorrelatorId(correlatorId);
we.setCorrelationKey(key);
@@ -696,7 +712,8 @@
evt.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
- MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(),MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
+ MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(new GUID().toString(),
+ MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
mexDao.setStatus(MessageExchange.Status.NEW.toString());
mexDao.setOperation(operation.getName());
mexDao.setPortType(partnerLink.partnerLink.partnerRolePortType.getQName());
@@ -773,39 +790,83 @@
return;
}
- PortType portType = partnerLink.partnerLink.partnerRolePortType;
EndpointReference myRoleEpr = null; // TODO: how did we get this?
mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
Set<InvocationStyle> supportedStyles = _contexts.mexContext.getSupportedInvocationStyle(partnerRoleChannel, partnerEpr);
- if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
- // If RELIABLE is supported, this is easy, we just do it in-line.
- ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
- .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
- _contexts.mexContext.invokePartnerReliable(reliableMex);
- } else if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
- // If TRANSACTED is supported, this is again easy, do it in-line.
- TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
- mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
- _contexts.mexContext.invokePartnerTransacted(transactedMex);
- } else if (supportedStyles.contains(InvocationStyle.BLOCKING)) {
- // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
- BlockingPartnerRoleMessageExchangeImpl blockingMex = new BlockingPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
- .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
- schedule(new BlockingInvoker(blockingMex));
- } else if (supportedStyles.contains(InvocationStyle.ASYNC)) {
- // For ASYNC style, we defer the call until after commit (unless idempotent).
- AsyncPartnerRoleMessageExchangeImpl asyncMex = new AsyncPartnerRoleMessageExchangeImpl(_bpelProcess, mexDao
- .getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr, partnerRoleChannel);
- schedule(new AsyncInvoker(asyncMex));
-
+
+ boolean oneway = MessageExchangePattern.valueOf(mexDao.getPattern()) == MessageExchangePattern.REQUEST_ONLY;
+
+ if (_bpelProcess.isInMemory()) {
+ // In-memory processes are a bit different, we're never going to do any scheduling for them, so we'd
+ // prefer to have TRANSACTED invocation style.
+ if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+ // If TRANSACTED is supported, this is again easy, do it in-line.
+ TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ _contexts.mexContext.invokePartnerTransacted(transactedMex);
+ } else if (supportedStyles.contains(InvocationStyle.RELIABLE) && oneway) {
+ // We can do RELIABLE for in-mem, but only if they are one way.
+ ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ _contexts.mexContext.invokePartnerReliable(reliableMex);
+
+ } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
+ // Need to cheat a little bit for in-memory processes; do the invoke in-line, but first suspend
+ // the transaction so that the IL does not get confused.
+ Transaction tx;
+ try {
+ tx = _contexts.txManager.suspend();
+ } catch (Exception ex) {
+ throw new BpelEngineException("TxManager Error: cannot suspend!", ex);
+ }
+ try {
+ UnreliablePartnerRoleMessageExchangeImpl unreliableMex = new UnreliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ _contexts.mexContext.invokePartnerBlocking(unreliableMex);
+ unreliableMex.waitForResponse();
+ } finally {
+ try {
+ _contexts.txManager.resume(tx);
+ } catch (Exception e) {
+ throw new BpelEngineException("TxManager Error: cannot resume!", e);
+ }
+ }
+ }
} else {
- // This really should not happen, indicates IL is screwy.
- __log.error("Integration Layer did not agree to any known invocation style for EPR " + partnerEpr);
- mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
- mexDao.setStatus(Status.FAILURE.toString());
- mexDao.setFaultExplanation("NoMatchingStyle");
+ if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+
+ // If TRANSACTED is supported, this is again easy, do it in-line. Also, this what we always do for
+ // in-mem processes (even if the IL claims to not support it.)
+ TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ _contexts.mexContext.invokePartnerTransacted(transactedMex);
+ } else if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
+ // If RELIABLE is supported, this is easy, we just do it in-line.
+ ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ _contexts.mexContext.invokePartnerReliable(reliableMex);
+ } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
+ // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
+ UnreliablePartnerRoleMessageExchangeImpl blockingMex = new UnreliablePartnerRoleMessageExchangeImpl(_bpelProcess,
+ mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
+ partnerRoleChannel);
+ // We schedule in-memory (no db) to guarantee "at most once" semantics.
+ schedule(new UnreliableInvoker(blockingMex));
+ // TODO: how do we recover the invocation if system dies in BlockingInvoker?
+ } else {
+ // This really should not happen, indicates IL is screwy.
+ __log.error("Integration Layer did not agree to any known invocation style for EPR " + partnerEpr);
+ mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
+ mexDao.setStatus(Status.FAILURE.toString());
+ mexDao.setFaultExplanation("NoMatchingStyle");
+ }
}
}
@@ -844,7 +905,7 @@
else if (target.getSupportedInvocationStyle(serviceName).contains(InvocationStyle.TRANSACTED))
style = InvocationStyle.TRANSACTED;
else
- style = InvocationStyle.BLOCKING;
+ style = InvocationStyle.UNRELIABLE;
} else /* persisted */{
if (operation.getOutput() != null
@@ -869,7 +930,8 @@
if (__log.isDebugEnabled())
__log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
- MessageExchangeDAO myRoleMex = _bpelProcess.createMessageExchange(new GUID().toString(),MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
+ MessageExchangeDAO myRoleMex = _bpelProcess.createMessageExchange(new GUID().toString(),
+ MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
myRoleMex.setCallee(serviceName);
myRoleMex.setPipedMessageExchange(partnerRoleMex);
myRoleMex.setOperation(partnerRoleMex.getOperation());
@@ -898,9 +960,8 @@
throw new BpelEngineException("MUST RUN IN TRANSACTION!");
if (_executed)
throw new IllegalStateException("cannot call execute() twice!");
-
+
long maxTime = System.currentTimeMillis() + _maxReductionTimeMs;
-
// Execute the process state reductions
boolean canReduce = true;
@@ -941,6 +1002,7 @@
try {
WorkEvent we = new WorkEvent();
we.setIID(_iid);
+ we.setProcessId(_bpelProcess.getPID());
we.setType(WorkEvent.Type.RESUME);
_contexts.scheduler.schedulePersistedJob(we.getDetail(), new Date());
} catch (ContextException e) {
@@ -1139,15 +1201,7 @@
mexDao.setFaultExplanation(optionalFaultData.toString());
}
mexDao.setFaultExplanation("Process completed without responding.");
-
- switch (istyle) {
- case RELIABLE:
- scheduleReliableResponse(mexDao);
- break;
- case ASYNC:
- scheduleAsyncResponse(mexDao);
- }
-
+ doMyRoleResponse(mexDao, status, Status.FAILURE);
}
}
}
@@ -1351,7 +1405,7 @@
* Attempt to match message exchanges on a correlator.
*
*/
- public void matcherEvent(String correlatorId, CorrelationKey ckey) {
+ void matcherEvent(String correlatorId, CorrelationKey ckey) {
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckey=" + ckey);
}
@@ -1387,19 +1441,6 @@
}
}
- /**
- * Add a scheduled ASYNC response.
- *
- * @param messageExchangeId
- */
- private void scheduleAsyncResponse(MessageExchangeDAO mexdao) {
- assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
- assert _contexts.isTransacted();
-
- final MyRoleMessageExchangeImpl mex = _bpelProcess.recreateMyRoleMex(mexdao);
- _pendingMyRoleReplies.add(mex);
- }
-
private void scheduleReliableResponse(MessageExchangeDAO messageExchange) {
assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
@@ -1422,31 +1463,52 @@
private void continuePartnerReplied(MessageExchangeDAO pmex) {
}
-
- class BlockingInvoker implements Runnable {
- public BlockingInvoker(BlockingPartnerRoleMessageExchangeImpl blockingMex) {
- // TODO Auto-generated constructor stub
+ /**
+ * Runnable that actually performs UNRELIABLE invokes on the partner.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+ class UnreliableInvoker implements Runnable {
+
+ UnreliablePartnerRoleMessageExchangeImpl _blockingMex;
+
+ public UnreliableInvoker(UnreliablePartnerRoleMessageExchangeImpl blockingMex) {
+ _blockingMex = blockingMex;
}
public void run() {
- // TODO Auto-generated method stub
-
+ assert !_contexts.isTransacted();
+
+ // TODO: what happens if system fails right here? we'll need to add a "retry" possibility
+
+ Runnable prc;
+ try {
+ _contexts.mexContext.invokePartnerBlocking(_blockingMex);
+ prc = new PartnerResponseContinuation(_blockingMex);
+ } catch (Exception ce) {
+ prc = new PartnerResponseContinuation(_blockingMex);
+ }
+
+ // Keep using the same thread to do the work, but note we need to run this in a transaction.
+ _instanceWorker.enqueue(_bpelProcess._server.new TransactedRunnable(prc));
}
-
+
}
-
- class AsyncInvoker implements Runnable {
- public AsyncInvoker(AsyncPartnerRoleMessageExchangeImpl asyncMex) {
- // TODO Auto-generated constructor stub
+ class PartnerResponseContinuation implements Runnable {
+
+ private UnreliablePartnerRoleMessageExchangeImpl _mex;
+
+ public PartnerResponseContinuation(UnreliablePartnerRoleMessageExchangeImpl blockingMex) {
+ _mex = blockingMex;
}
public void run() {
- // TODO Auto-generated method stub
-
+
}
-
+
}
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/DebuggerSupport.java Fri Aug 3 13:07:33 2007
@@ -171,6 +171,7 @@
WorkEvent we = new WorkEvent();
we.setIID(iid);
+ we.setProcessId(_process.getPID());
we.setType(WorkEvent.Type.RESUME);
_process._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
@@ -294,6 +295,7 @@
WorkEvent we = new WorkEvent();
we.setType(WorkEvent.Type.RESUME);
+ we.setProcessId(_process.getPID());
we.setIID(iid);
_process._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Fri Aug 3 13:07:33 2007
@@ -218,7 +218,6 @@
}
void serverFailed(FailureType type, String reason, Element details) {
- // TODO not using FailureType, nor details
_failureType = type;
_explanation = reason;
setStatus(Status.FAILURE);
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=562569&r1=562568&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java Fri Aug 3 13:07:33 2007
@@ -175,6 +175,7 @@
protected WorkEvent generateInvokeResponseWorkEvent() {
WorkEvent we = new WorkEvent();
+ we.setProcessId(_process.getPID());
we.setIID(_iid);
we.setType(WorkEvent.Type.PARTNER_RESPONSE);
we.setChannel(_responseChannel);
@@ -189,9 +190,6 @@
if (getStatus() != MessageExchange.Status.REQUEST && getStatus() != MessageExchange.Status.ASYNC)
throw new BpelEngineException("Invalid message exchange state, expect REQUEST or ASYNC, but got " + getStatus());
- // In-memory processe are special, they don't allow scheduling so any replies must be delivered immediately.
- if (!_blocked && _process.isInMemory())
- throw new BpelEngineException("Cannot reply to in-memory process outside of BLOCKING call");
}
}
Copied: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java (from r562188, ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java)
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=562569&p1=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java&r1=562188&p2=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/AsyncMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java Fri Aug 3 13:07:33 2007
@@ -11,8 +11,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.bpel.dao.MessageDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.o.OPartnerLink;
@@ -22,12 +22,13 @@
* @author Maciej Szefler <mszefler at gmail dot com>
*
*/
-public class AsyncMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
+public class UnreliableMyRoleMessageExchangeImpl extends MyRoleMessageExchangeImpl {
private static final Log __log = LogFactory.getLog(ReliableMyRoleMessageExchangeImpl.class);
+ boolean _done = false;
ResponseFuture _future;
- public AsyncMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
+ public UnreliableMyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
super(process, mexId, oplink, operation, callee);
}
@@ -59,7 +60,7 @@
_process.enqueueTransaction(new Callable<Void>() {
public Void call() throws Exception {
- AsyncMyRoleMessageExchangeImpl.super.setStatus(Status.REQUEST);
+ UnreliableMyRoleMessageExchangeImpl.super.setStatus(Status.REQUEST);
MessageExchangeDAO dao = _process.createMessageExchange(getMessageExchangeId(), MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
save(dao);
if (_process.isInMemory())
@@ -76,6 +77,32 @@
}
+
+ @Override
+ public InvocationStyle getInvocationStyle() {
+ return InvocationStyle.UNRELIABLE;
+ }
+
+
+ @Override
+ public Status invokeBlocking() throws BpelEngineException, TimeoutException {
+ if (_done)
+ return getStatus();
+
+ Future<Status> future = _future != null ? _future : super.invokeAsync();
+
+ try {
+ future.get(Math.max(_timeout,1), TimeUnit.MILLISECONDS);
+ _done = true;
+ return getStatus();
+ } catch (InterruptedException e) {
+ throw new BpelEngineException(e);
+ } catch (ExecutionException e) {
+ throw new BpelEngineException(e.getCause());
+ }
+ }
+
+
private static class ResponseFuture implements Future<Status> {
private Status _status;
@@ -121,11 +148,6 @@
this.notifyAll();
}
}
- }
-
- @Override
- public InvocationStyle getInvocationStyle() {
- return InvocationStyle.ASYNC;
}
}
Copied: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java (from r561873, ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java)
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java?view=diff&rev=562569&p1=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java&r1=561873&p2=ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java&r2=562569
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BlockingPartnerRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java Fri Aug 3 13:07:33 2007
@@ -2,46 +2,109 @@
import javax.wsdl.Operation;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.engine.MessageExchangeImpl.InDbAction;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.o.OPartnerLink;
/**
* Implementation of the {@link PartnerRoleMessageExchange} interface that is passed to the IL when the
- * BLOCKING invocation style is used (see {@link InvocationStyle#BLOCKING}). The basic idea here is that
- * with this style, the IL performs the operation while blocking in the
- * {@link MessageExchangeContext#invokePartner(org.apache.ode.bpel.iapi.PartnerRoleMessageExchange)} method.
+ * UNRELIABLE invocation style is used (see {@link InvocationStyle#UNRELIABLE}). The basic idea here is
+ * that with this style, the IL performs the operation outside of a transactional context. It can either
+ * finish it right away (BLOCK) or indicate that the response will be provided later (replyASYNC).
*
- * This InvocationStyle makes this class rather trivial.
+ *
+ * TODO: serious synchronization issues in this class.
*
* @author Maciej Szefler <mszefler at gmail dot com>
*
*/
-public class BlockingPartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
+public class UnreliablePartnerRoleMessageExchangeImpl extends PartnerRoleMessageExchangeImpl {
+ private static final Log __log = LogFactory.getLog(UnreliablePartnerRoleMessageExchangeImpl.class);
+
- BlockingPartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
+ UnreliablePartnerRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, EndpointReference epr, EndpointReference myRoleEPR, PartnerRoleChannel channel) {
super(process, mexId, oplink, operation, epr, myRoleEPR, channel);
}
- /**
- * The criteria for issuing a replyXXX call on BLOCKING message exchanges is that the replyXXX must come while the
- * engine is blocked in an
- * {@link MessageExchangeContext#invokePartnerBlocking(org.apache.ode.bpel.iapi.PartnerRoleMessageExchange)}.
- * method.
- */
+
+ @Override
+ public InvocationStyle getInvocationStyle() {
+ return InvocationStyle.UNRELIABLE;
+ }
+
+
+ @Override
+ protected void resumeInstance() {
+ assert !_contexts.isTransacted() : "checkReplyContext() should have prevented us from getting here.";
+ assert !_process.isInMemory() : "resumeInstance() for in-mem processes makes no sense.";
+
+ final WorkEvent we = generateInvokeResponseWorkEvent();
+ if (__log.isDebugEnabled()) {
+ __log.debug("resumeInstance: scheduling WorkEvent " + we);
+ }
+
+
+ doInTX(new InDbAction<Void>() {
+
+ public Void call(MessageExchangeDAO mexdao) {
+ save(mexdao);
+ _contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+ return null;
+ }
+ });
+ }
+
@Override
protected void checkReplyContextOk() {
- if (!_blocked)
+ super.checkReplyContextOk();
+
+ if (!_blocked && getStatus() != Status.ASYNC)
throw new BpelEngineException("replyXXX operation attempted outside of BLOCKING region!");
+
+ // Prevent user from attempting the replyXXXX calls while a transaction is active.
+ if (_contexts.isTransacted())
+ throw new BpelEngineException("Cannot reply to UNRELIABLE style invocation from a transactional context!");
+
+
}
+
+
@Override
- public InvocationStyle getInvocationStyle() {
- return InvocationStyle.BLOCKING;
+ public void replyAsync(String foreignKey) {
+ if (__log.isDebugEnabled())
+ __log.debug("replyAsync mex=" + _mexId);
+
+ sync();
+
+ if (!_blocked)
+ throw new BpelEngineException("Invalid context for replyAsync(); can only be called during MessageExchangeContext call. ");
+
+ // TODO: shouldn't this set _blocked?
+
+ checkReplyContextOk();
+ setStatus(Status.ASYNC);
+ _foreignKey = foreignKey;
+ sync();
+
+ }
+
+
+ /**
+ * Method used by server to wait until a response is available.
+ */
+ Status waitForResponse() {
+ // TODO: actually wait for response.
+ return getStatus();
}