You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/06 22:47:59 UTC
svn commit: r563267 [1/2] - in
/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel: engine/
memdao/
Author: mszefler
Date: Mon Aug 6 13:47:58 2007
New Revision: 563267
URL: http://svn.apache.org/viewvc?view=rev&rev=563267
Log:
BART, some additinal refactorings. New model to fix concurrency problems in Partner invokes.
Added:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java (with props)
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java (with props)
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliableMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ReliablePartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/TransactedPartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliablePartnerRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Mon Aug 6 13:47:58 2007
@@ -54,6 +54,7 @@
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
@@ -152,7 +153,7 @@
// TODO : do this on a per-partnerlink basis, support transacted styles.
HashSet<InvocationStyle> istyles = new HashSet<InvocationStyle>();
istyles.add(InvocationStyle.UNRELIABLE);
-
+
if (!conf.isTransient()) {
istyles.add(InvocationStyle.RELIABLE);
} else {
@@ -185,18 +186,40 @@
_hydrationLatch.latch(1);
try {
+ // The following check is mostly for sanity purposes. MexImpls should prevent this from
+ // happening.
PartnerLinkMyRoleImpl target = getMyRoleForService(mexdao.getCallee());
+ Status oldstatus = Status.valueOf(mexdao.getStatus());
if (target == null) {
String errmsg = __msgs.msgMyRoleRoutingFailure(mexdao.getMessageExchangeId());
__log.error(errmsg);
mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_ENDPOINT.toString());
mexdao.setFaultExplanation(errmsg);
- Status oldstatus = Status.valueOf(mexdao.getStatus());
- mexdao.setStatus(Status.FAILURE.toString());
- fireMexStateEvent(mexdao, oldstatus, Status.FAILURE);
+ mexdao.setStatus(Status.ACK.toString());
+ mexdao.setAckType(AckType.FAILURE);
+ fireMexStateEvent(mexdao, oldstatus, Status.ACK);
+ return;
+ }
+
+ Operation op = target._plinkDef.getMyRoleOperation(mexdao.getOperation());
+ if (op == null) {
+ String errmsg = __msgs.msgMyRoleRoutingFailure(mexdao.getMessageExchangeId());
+ __log.error(errmsg);
+ mexdao.setFailureType(MessageExchange.FailureType.UNKNOWN_OPERATION.toString());
+ mexdao.setFaultExplanation(errmsg);
+ mexdao.setStatus(Status.ACK.toString());
+ mexdao.setAckType(AckType.FAILURE);
+ fireMexStateEvent(mexdao, oldstatus, Status.ACK);
return;
}
+ // "Acknowledge" any one-way invokes
+ if (op.getOutput() == null) {
+ mexdao.setStatus(Status.ACK.toString());
+ mexdao.setAckType(AckType.ONEWAY);
+ fireMexStateEvent(mexdao, oldstatus, Status.ACK);
+ }
+
mexdao.setProcess(getProcessDAO());
// TODO: fix this
@@ -220,7 +243,7 @@
} else if (cstatus == CorrelationStatus.MATCHED) {
doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
public Void call() {
- executeContinueInstance(mexdao);
+ executeContinueInstanceMyRoleRequestReceived(mexdao);
return null;
}
});
@@ -233,13 +256,9 @@
_hydrationLatch.release(1);
}
- // TODO: relocate this code // For a one way, once the engine is done, the mex can be safely released.
- // if (mex.getMessageExchangePattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
- // mex.release();
- // }
}
- private void executeCreateInstance(MessageExchangeDAO mexdao) {
+ void executeCreateInstance(MessageExchangeDAO mexdao) {
assert _hydrationLatch.isLatched(1);
BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
@@ -249,11 +268,12 @@
instanceCtx.execute();
}
- private void executeContinueInstance(MessageExchangeDAO mexdao) {
+ void executeContinueInstanceMyRoleRequestReceived(MessageExchangeDAO mexdao) {
assert _hydrationLatch.isLatched(1);
BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
assert worker.isWorkerThread();
+
BpelRuntimeContextImpl instance = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null);
int amp = mexdao.getChannel().indexOf('&');
String groupId = mexdao.getChannel().substring(0, amp);
@@ -262,6 +282,20 @@
instance.execute();
}
+ void executeContinueInstancePartnerRoleResponseReceived(MessageExchangeDAO mexdao) {
+ assert _hydrationLatch.isLatched(1);
+ BpelInstanceWorker worker = _instanceWorkerCache.get(mexdao.getInstance().getInstanceId());
+ assert worker.isWorkerThread();
+
+// TODO: we need a way to check if the lastBRC is indeed the lastBRC (serial number on the instanceDAO)
+// BpelRuntimeContextImpl brc = lastBRC == null ? new BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null)
+// : new BpelRuntimeContextImpl(worker, mexdao.getInstance(), lastBRC);
+ BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, mexdao.getInstance(), null, null);
+
+ brc.injectPartnerResponse(mexdao.getMessageExchangeId(), mexdao.getChannel());
+ brc.execute();
+ }
+
private void enqueueInstanceWork(Long instanceId, Runnable runnable) {
BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
iworker.enqueue(runnable);
@@ -278,7 +312,7 @@
* @param instanceId
* @param name
*/
- private void scheduleInstanceWork(final Long instanceId, final Runnable runnable) {
+ void scheduleInstanceWork(final Long instanceId, final Runnable runnable) {
_contexts.registerCommitSynchronizer(new Runnable() {
public void run() {
BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
@@ -438,40 +472,40 @@
BpelInstanceWorker worker = _instanceWorkerCache.get(we.getIID());
assert worker.isWorkerThread();
- ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getIID());
- if (procInstance == null) {
+ ProcessInstanceDAO instanceDAO = getProcessDAO().getInstance(we.getIID());
+ if (instanceDAO == null) {
if (__log.isDebugEnabled()) {
__log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring.");
}
return;
}
- BpelRuntimeContextImpl processInstance = new BpelRuntimeContextImpl(worker, procInstance, null, null);
+ BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDAO, null, null);
switch (we.getType()) {
case TIMER:
if (__log.isDebugEnabled()) {
- __log.debug("handleWorkEvent: TimerWork event for process instance " + processInstance);
+ __log.debug("handleWorkEvent: TimerWork event for process instance " + brc);
}
- processInstance.timerEvent(we.getChannel());
+ brc.timerEvent(we.getChannel());
break;
case RESUME:
if (__log.isDebugEnabled()) {
__log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID());
}
- processInstance.execute();
+ brc.execute();
break;
case PARTNER_RESPONSE:
if (__log.isDebugEnabled()) {
__log.debug("InvokeResponse event for iid " + we.getIID());
}
- processInstance.injectPartnerResponse(we.getMexId(), we.getChannel());
- processInstance.execute();
+ brc.injectPartnerResponse(we.getMexId(), we.getChannel());
+ brc.execute();
break;
case MATCHER:
if (__log.isDebugEnabled()) {
__log.debug("Matcher event for iid " + we.getIID());
}
- processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
+ brc.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
}
}
@@ -756,36 +790,12 @@
}
PartnerRoleMessageExchangeImpl createPartnerRoleMex(MessageExchangeDAO mexdao) {
- InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
- PartnerRoleMessageExchangeImpl mex;
+
_hydrationLatch.latch(1);
try {
OPartnerLink plink = (OPartnerLink) _oprocess.getChild(mexdao.getPartnerLinkModelId());
- Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
- switch (istyle) {
- case UNRELIABLE:
- mex = new UnreliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
- break;
- case TRANSACTED:
- mex = new TransactedPartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /*
- * EPR
- * todo
- */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
- break;
- case RELIABLE:
- mex = new ReliablePartnerRoleMessageExchangeImpl(this, mexdao.getMessageExchangeId(), plink, op, null, /* EPR todo */
- plink.hasMyRole() ? getInitialMyRoleEPR(plink) : null, getPartnerRoleChannel(plink));
- break;
-
- default:
- throw new BpelEngineException("Unexpected InvocationStyle: " + istyle);
-
- }
-
- mex.load(mexdao);
- return mex;
+ PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(plink);
+ return prole.createPartnerRoleMex(mexdao);
} finally {
_hydrationLatch.release(1);
}
@@ -887,6 +897,124 @@
}
}
+ MessageExchangeDAO createMessageExchange(String mexId, final char dir) {
+ if (isInMemory()) {
+ return _inMemDao.getConnection().createMessageExchange(mexId, dir);
+ } else {
+ return _contexts.dao.getConnection().createMessageExchange(mexId, dir);
+ }
+ }
+
+ MessageExchangeDAO getInMemMexDAO(String mexId) {
+ return _inMemDao.getConnection().getMessageExchange(mexId);
+ }
+
+ /**
+ * Schedule process-level work. This method defers to the server to do the scheduling and wraps the {@link Runnable} in a
+ * try-finally block that ensures that the process is hydrated.
+ *
+ * @param runnable
+ */
+ void scheduleRunnable(final Runnable runnable) {
+ if (__log.isDebugEnabled())
+ __log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
+
+ _server.scheduleRunnable(new ProcessRunnable(runnable));
+ }
+
+ void enqueueRunnable(BpelInstanceWorker worker) {
+ if (__log.isDebugEnabled())
+ __log.debug("enqueuRunnable for process " + _pid + ": " + worker);
+
+ _server.enqueueRunnable(new ProcessRunnable(worker));
+ }
+
+ MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle istyle, final QName targetService, final String operation,
+ final String clientKey) {
+
+ final String mexId = new GUID().toString();
+ _hydrationLatch.latch(1);
+ try {
+
+ final PartnerLinkMyRoleImpl target = getPartnerLinkForService(targetService);
+ if (target == null)
+ throw new BpelEngineException("NoSuchService: " + targetService);
+ final Operation op = target._plinkDef.getMyRoleOperation(operation);
+ if (op == null)
+ throw new BpelEngineException("NoSuchOperation: " + operation);
+
+ return newMyRoleMex(istyle, mexId, target._endpoint.serviceName, target._plinkDef, op);
+
+ } finally {
+ _hydrationLatch.release(1);
+ }
+ }
+
+ void registerMyRoleMex(MyRoleMessageExchangeImpl mymex) {
+ _mexStateListeners.add(new WeakReference<MyRoleMessageExchangeImpl>(mymex));
+ }
+
+ void unregisterMyRoleMex(MyRoleMessageExchangeImpl mymex) {
+ ArrayList<WeakReference<MyRoleMessageExchangeImpl>> needsRemoval = new ArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
+ for (WeakReference<MyRoleMessageExchangeImpl> wref : _mexStateListeners) {
+ MyRoleMessageExchangeImpl mex = wref.get();
+ if (mex == null || mex == mymex)
+ needsRemoval.add(wref);
+ }
+ _mexStateListeners.removeAll(needsRemoval);
+
+ }
+
+ void fireMexStateEvent(MessageExchangeDAO mexdao, Status old, Status news) {
+ // TODO: force a myrole mex to be created if it is not in cache.
+
+ if (old != news)
+ for (WeakReference<MyRoleMessageExchangeImpl> wr : _mexStateListeners) {
+ MyRoleMessageExchangeImpl mymex = wr.get();
+ if (mymex != null && mymex.getMessageExchangeId() != null)
+ mymex.onStateChanged(mexdao, old, news);
+ }
+
+ }
+
+ class ProcessRunnable implements Runnable {
+ Runnable _work;
+
+ ProcessRunnable(Runnable work) {
+ _work = work;
+ }
+
+ public void run() {
+ _hydrationLatch.latch(1);
+ try {
+ _work.run();
+ } finally {
+ _hydrationLatch.release(1);
+ }
+
+ }
+
+ }
+
+ class ProcessCallable<T> implements Callable<T> {
+ Callable<T> _work;
+
+ ProcessCallable(Callable<T> work) {
+ _work = work;
+ }
+
+ public T call() throws Exception {
+ _hydrationLatch.latch(1);
+ try {
+ return _work.call();
+ } finally {
+ _hydrationLatch.release(1);
+ }
+
+ }
+
+ }
+
class HydrationLatch extends NStateLatch {
HydrationLatch() {
@@ -994,121 +1122,21 @@
}
- MessageExchangeDAO createMessageExchange(String mexId, final char dir) {
- if (isInMemory()) {
- return _inMemDao.getConnection().createMessageExchange(mexId, dir);
- } else {
- return _contexts.dao.getConnection().createMessageExchange(mexId, dir);
- }
- }
-
- MessageExchangeDAO getInMemMexDAO(String mexId) {
- return _inMemDao.getConnection().getMessageExchange(mexId);
- }
-
/**
- * Schedule process-level work. This method defers to the server to do the scheduling and wraps the {@link Runnable} in a
- * try-finally block that ensures that the process is hydrated.
+ * Invoke a partner via the integration layer.
*
- * @param runnable
+ * @param mexDao
+ * @param brc
*/
- void scheduleRunnable(final Runnable runnable) {
- if (__log.isDebugEnabled())
- __log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
-
- _server.scheduleRunnable(new ProcessRunnable(runnable));
- }
-
- public void enqueueRunnable(BpelInstanceWorker worker) {
- if (__log.isDebugEnabled())
- __log.debug("enqueuRunnable for process " + _pid + ": " + worker);
-
- _server.enqueueRunnable(new ProcessRunnable(worker));
- }
-
- class ProcessRunnable implements Runnable {
- Runnable _work;
-
- ProcessRunnable(Runnable work) {
- _work = work;
- }
-
- public void run() {
- _hydrationLatch.latch(1);
- try {
- _work.run();
- } finally {
- _hydrationLatch.release(1);
- }
-
- }
-
- }
-
- class ProcessCallable<T> implements Callable<T> {
- Callable<T> _work;
-
- ProcessCallable(Callable<T> work) {
- _work = work;
- }
-
- public T call() throws Exception {
- _hydrationLatch.latch(1);
- try {
- return _work.call();
- } finally {
- _hydrationLatch.release(1);
- }
-
- }
-
- }
-
- public MyRoleMessageExchange createNewMyRoleMex(final InvocationStyle istyle, final QName targetService,
- final String operation, final String clientKey) {
-
- final String mexId = new GUID().toString();
+ void invokeIL(MessageExchangeDAO mexDao) {
_hydrationLatch.latch(1);
try {
-
- final PartnerLinkMyRoleImpl target = getPartnerLinkForService(targetService);
- if (target == null)
- throw new BpelEngineException("NoSuchService: " + targetService);
- final Operation op = target._plinkDef.getMyRoleOperation(operation);
- if (op == null)
- throw new BpelEngineException("NoSuchOperation: " + operation);
-
- return newMyRoleMex(istyle, mexId, target._endpoint.serviceName, target._plinkDef, op);
-
+ OPartnerLink oplink = (OPartnerLink) _oprocess.getChild(mexDao.getPartnerLinkModelId());
+ PartnerLinkPartnerRoleImpl partnerRole = _partnerRoles.get(oplink);
+ partnerRole.invokeIL(mexDao);
} finally {
_hydrationLatch.release(1);
}
}
- void registerMyRoleMex(MyRoleMessageExchangeImpl mymex) {
- _mexStateListeners.add(new WeakReference<MyRoleMessageExchangeImpl>(mymex));
- }
-
- void unregisterMyRoleMex(MyRoleMessageExchangeImpl mymex) {
- ArrayList<WeakReference<MyRoleMessageExchangeImpl>> needsRemoval = new ArrayList<WeakReference<MyRoleMessageExchangeImpl>>();
- for (WeakReference<MyRoleMessageExchangeImpl> wref : _mexStateListeners) {
- MyRoleMessageExchangeImpl mex = wref.get();
- if (mex == null || mex == mymex)
- needsRemoval.add(wref);
- }
- _mexStateListeners.removeAll(needsRemoval);
-
- }
-
- void fireMexStateEvent(MessageExchangeDAO mexdao, Status old, Status news) {
- // TODO: force a myrole mex to be created if it is not in cache.
-
- if (old != news)
- for (WeakReference<MyRoleMessageExchangeImpl> wr : _mexStateListeners) {
- MyRoleMessageExchangeImpl mymex = wr.get();
- if (mymex != null && mymex.getMessageExchangeId() != null)
- mymex.onStateChanged(mexdao, old, news);
- }
-
- }
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Mon Aug 6 13:47:58 2007
@@ -25,13 +25,8 @@
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
import javax.wsdl.Operation;
-import javax.wsdl.PortType;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
@@ -62,6 +57,7 @@
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
@@ -92,7 +88,6 @@
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.Namespaces;
import org.apache.ode.utils.ObjectPrinter;
-import org.omg.CORBA._PolicyStub;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -119,9 +114,6 @@
private MessageExchangeDAO _instantiatingMessageExchange;
- /** Object for keeping track of all the outstanding <pick>/<receive> activities */
- private OutstandingRequestManager _outstandingRequests;
-
/** List of pending invocations that need to be deferred until the end of the current TX */
private List<PartnerRoleMessageExchangeImpl> _pendingPartnerRoleInvokes = new LinkedList<PartnerRoleMessageExchangeImpl>();
@@ -139,22 +131,26 @@
private boolean _executed;
- public BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
+ /**
+ * Construct a BRC using the soup from the previous BRC. This is handy as it allows us to eliminate the DB read of the soup,
+ * when we know the soup has not changed since the last TX.
+ *
+ * @param instanceWorker
+ * @param instanceDao
+ * @param lastBRC
+ */
+ BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO instanceDao, BpelRuntimeContextImpl lastBRC) {
+ this(instanceWorker, instanceDao, lastBRC._soup);
+ }
+
+ BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, PROCESS PROCESS,
MessageExchangeDAO instantiatingMessageExchange) {
- _instanceWorker = instanceWorker;
- _bpelProcess = instanceWorker._process;
- _contexts = instanceWorker._contexts;
- _dao = dao;
- _iid = dao.getInstanceId();
+
+ this(instanceWorker, dao, new ExecutionQueueImpl(null));
_instantiatingMessageExchange = instantiatingMessageExchange;
- _vpu = new JacobVPU();
- _vpu.registerExtension(BpelRuntimeContext.class, this);
- _soup = new ExecutionQueueImpl(null);
_soup.setReplacementMap(_bpelProcess.getReplacementMap());
- _outstandingRequests = new OutstandingRequestManager();
- _vpu.setContext(_soup);
-
+ _soup.setGlobalData(new OutstandingRequestManager());
byte[] daoState = _bpelProcess.isInMemory() ? null : dao.getExecutionState();
if (daoState != null) {
assert !_bpelProcess.isInMemory() : "did not expect to rehydrate in-mem process!";
@@ -164,13 +160,23 @@
} catch (Exception ex) {
throw new RuntimeException(ex);
}
- _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData();
}
-
if (PROCESS != null) {
_vpu.inject(PROCESS);
}
+ }
+
+ BpelRuntimeContextImpl(BpelInstanceWorker instanceWorker, ProcessInstanceDAO dao, ExecutionQueueImpl soup) {
+ _instanceWorker = instanceWorker;
+ _bpelProcess = instanceWorker._process;
+ _contexts = instanceWorker._contexts;
+ _dao = dao;
+ _iid = dao.getInstanceId();
+ _vpu = new JacobVPU();
+ _vpu.registerExtension(BpelRuntimeContext.class, this);
+ _soup = soup;
+ _vpu.setContext(_soup);
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("BpelRuntimeContextImpl created for instance " + _iid + ". INDEXED STATE=" + _soup.getIndex());
}
@@ -330,11 +336,11 @@
correlators.add(processDao.getCorrelator(correlatorId));
}
- int conflict = _outstandingRequests.findConflict(selectors);
+ int conflict = getORM().findConflict(selectors);
if (conflict != -1)
throw new FaultException(_bpelProcess.getOProcess().constants.qnConflictingReceive, selectors[conflict].toString());
- _outstandingRequests.register(pickResponseChannelStr, selectors);
+ getORM().register(pickResponseChannelStr, selectors);
// TODO - ODE-58
@@ -508,7 +514,7 @@
public void reply(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg, QName fault)
throws FaultException {
- String mexRef = _outstandingRequests.release(plinkInstnace, opName, mexId);
+ String mexRef = getORM().release(plinkInstnace, opName, mexId);
if (mexRef == null) {
throw new FaultException(_bpelProcess.getOProcess().constants.qnMissingRequest);
@@ -530,21 +536,20 @@
myrolemex.setResponse(message);
- Status status;
-
+ AckType ackType;
if (fault != null) {
- status = Status.FAULT;
+ ackType = AckType.FAULT;
myrolemex.setFault(fault);
evt.setAspect(ProcessMessageExchangeEvent.PROCESS_FAULT);
} else {
- status = Status.RESPONSE;
+ ackType = AckType.RESPONSE;
evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
}
Status previousStatus = Status.valueOf(myrolemex.getStatus());
- myrolemex.setStatus(status.toString());
-
- doMyRoleResponse(myrolemex, previousStatus, status);
+ myrolemex.setStatus(Status.ACK.toString());
+ myrolemex.setAckType(ackType);
+ doMyRoleResponse(myrolemex, previousStatus, Status.ACK);
sendEvent(evt);
}
@@ -559,7 +564,8 @@
}
/**
- * Handle P2P responses.
+ * Handle P2P responses.
+ *
* @param myrolemex
*/
private void p2pResponse(MessageExchangeDAO myrolemex) {
@@ -683,6 +689,8 @@
public String invoke(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
InvokeResponseChannel channel) throws FaultException {
+ // TODO: move a lot of this into BpelProcess
+
// Get the Integration Layer's communication channel for the partnerlink.
PartnerRoleChannel partnerRoleChannel = _bpelProcess.getPartnerRoleChannel(partnerLink.partnerLink);
@@ -741,27 +749,10 @@
if (p2pProcess != null) {
/* P2P (process-to-process) invocation, special logic */
- invokeP2P(p2pProcess, partnerEndpoint.serviceName, operation, outgoingMessage, mexDao);
+ invokeP2P(p2pProcess, partnerEndpoint.serviceName, operation, mexDao);
} else {
/* NOT p2p, need to call out to IL */
- invokeIL(partnerLink, operation, outgoingMessage, partnerRoleChannel, partnerEpr, mexDao);
- }
-
- // In case a response/fault was available right away, which will happen for BLOCKING/TRANSACTED invocations,
- // we need to inject a message on the response channel, so that the process continues.
- switch (Status.valueOf(mexDao.getStatus())) {
- case ASYNC:
- break;
- case RESPONSE:
- case FAULT:
- case FAILURE:
- injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
- break;
- default:
- __log.error("Partner did not acknowledge message exchange: " + mexDao.getMessageExchangeId());
- mexDao.setStatus(Status.FAILURE.toString());
- mexDao.setFailureType(FailureType.NO_RESPONSE.toString());
- injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
+ invokeIL(partnerLink, operation, partnerRoleChannel, partnerEpr, mexDao);
}
return mexDao.getMessageExchangeId();
@@ -778,107 +769,39 @@
* @param partnerEpr
* @param mexDao
*/
- private void invokeIL(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
- PartnerRoleChannel partnerRoleChannel, EndpointReference partnerEpr, MessageExchangeDAO mexDao) {
+ private void invokeIL(PartnerLinkInstance partnerLink, Operation operation, PartnerRoleChannel partnerRoleChannel,
+ EndpointReference partnerEpr, MessageExchangeDAO mexDao) {
+
// If we couldn't find the endpoint, then there is no sense
// in asking the IL to invoke.
if (partnerEpr == null) {
__log.error("Couldn't find endpoint for partner EPR ");
mexDao.setFailureType(FailureType.UNKNOWN_ENDPOINT.toString());
mexDao.setFaultExplanation("UnknownEndpoint");
- mexDao.setStatus(Status.FAILURE.toString());
+ mexDao.setStatus(Status.ACK.toString());
+ mexDao.setAckType(AckType.FAILURE);
return;
}
- EndpointReference myRoleEpr = null; // TODO: how did we get this?
-
mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
- mexDao.setStatus(MessageExchange.Status.REQUEST.toString());
- Set<InvocationStyle> supportedStyles = _contexts.mexContext.getSupportedInvocationStyle(partnerRoleChannel, partnerEpr);
-
- boolean oneway = MessageExchangePattern.valueOf(mexDao.getPattern()) == MessageExchangePattern.REQUEST_ONLY;
+ mexDao.setStatus(MessageExchange.Status.REQ.toString());
- if (_bpelProcess.isInMemory()) {
- // In-memory processes are a bit different, we're never going to do any scheduling for them, so we'd
- // prefer to have TRANSACTED invocation style.
- if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
- // If TRANSACTED is supported, this is again easy, do it in-line.
- TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
- mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
- partnerRoleChannel);
- _contexts.mexContext.invokePartnerTransacted(transactedMex);
- } else if (supportedStyles.contains(InvocationStyle.RELIABLE) && oneway) {
- // We can do RELIABLE for in-mem, but only if they are one way.
- ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess,
- mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
- partnerRoleChannel);
- _contexts.mexContext.invokePartnerReliable(reliableMex);
-
- } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
- // Need to cheat a little bit for in-memory processes; do the invoke in-line, but first suspend
- // the transaction so that the IL does not get confused.
- Transaction tx;
- try {
- tx = _contexts.txManager.suspend();
- } catch (Exception ex) {
- throw new BpelEngineException("TxManager Error: cannot suspend!", ex);
- }
- try {
- UnreliablePartnerRoleMessageExchangeImpl unreliableMex = new UnreliablePartnerRoleMessageExchangeImpl(_bpelProcess,
- mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
- partnerRoleChannel);
- _contexts.mexContext.invokePartnerBlocking(unreliableMex);
- unreliableMex.waitForResponse();
- } finally {
- try {
- _contexts.txManager.resume(tx);
- } catch (Exception e) {
- throw new BpelEngineException("TxManager Error: cannot resume!", e);
- }
- }
- }
- } else {
- if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+ _bpelProcess.invokeIL(mexDao);
- // If TRANSACTED is supported, this is again easy, do it in-line. Also, this what we always do for
- // in-mem processes (even if the IL claims to not support it.)
- TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_bpelProcess,
- mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
- partnerRoleChannel);
- _contexts.mexContext.invokePartnerTransacted(transactedMex);
- } else if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
- // If RELIABLE is supported, this is easy, we just do it in-line.
- ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_bpelProcess,
- mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
- partnerRoleChannel);
- _contexts.mexContext.invokePartnerReliable(reliableMex);
- } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
- // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
- UnreliablePartnerRoleMessageExchangeImpl blockingMex = new UnreliablePartnerRoleMessageExchangeImpl(_bpelProcess,
- mexDao.getMessageExchangeId(), partnerLink.partnerLink, operation, partnerEpr, myRoleEpr,
- partnerRoleChannel);
- // We schedule in-memory (no db) to guarantee "at most once" semantics.
- schedule(new UnreliableInvoker(blockingMex));
- // TODO: how do we recover the invocation if system dies in BlockingInvoker?
- } else {
- // This really should not happen, indicates IL is screwy.
- __log.error("Integration Layer did not agree to any known invocation style for EPR " + partnerEpr);
- mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
- mexDao.setStatus(Status.FAILURE.toString());
- mexDao.setFaultExplanation("NoMatchingStyle");
- }
+ // In case a response/fault was available right away, which will happen for BLOCKING/TRANSACTED invocations,
+ // we need to inject a message on the response channel, so that the process continues.
+ switch (Status.valueOf(mexDao.getStatus())) {
+ case REQ:
+ break;
+ case ACK:
+ injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
+ break;
+ default:
+ throw new AssertionError("Unexpected MEX status: " + mexDao.getStatus());
}
}
- private void schedule(final Runnable runnable) {
- _contexts.registerCommitSynchronizer(new Runnable() {
- public void run() {
- _instanceWorker.enqueue(runnable);
- }
- });
- }
-
/**
* Invoke a partner process directly (via the engine), bypassing the Integration Layer. Obviously this can only be used when an
* process is partners with another process hosted on the same engine.
@@ -887,8 +810,7 @@
* @param outgoingMessage
* @param partnerRoleMex
*/
- private void invokeP2P(BpelProcess target, QName serviceName, Operation operation, Element outgoingMessage,
- MessageExchangeDAO partnerRoleMex) {
+ private void invokeP2P(BpelProcess target, QName serviceName, Operation operation, MessageExchangeDAO partnerRoleMex) {
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("Invoking in a p2p interaction, partnerrole " + partnerRoleMex.getMessageExchangeId());
}
@@ -950,9 +872,8 @@
if (mySessionId != null)
myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId);
- partnerRoleMex.setStatus(MessageExchange.Status.ASYNC.toString());
target.invokeProcess(myRoleMex);
- // TODO: perhaps we should check if the other process finished ,or will it always
+ // TODO: perhaps we should check if the other process finished ,or will it always?
}
void execute() {
@@ -973,7 +894,7 @@
if (!ProcessState.isFinished(_dao.getState())) {
if (__log.isDebugEnabled())
__log.debug("Setting execution state on instance " + _iid);
- _soup.setGlobalData(_outstandingRequests);
+ _soup.setGlobalData(getORM());
if (_bpelProcess.isInMemory()) {
// don't serialize in-memory processes
@@ -1029,8 +950,9 @@
evt.setNewState(ProcessState.STATE_ACTIVE);
sendEvent(evt);
}
+
- _outstandingRequests.associate(responsechannel, mexdao.getMessageExchangeId());
+ getORM().associate(responsechannel, mexdao.getMessageExchangeId());
final String mexId = mexdao.getMessageExchangeId();
_vpu.inject(new JacobRunnable() {
@@ -1047,7 +969,7 @@
// In case this is a pick event, we remove routes,
// and cancel the outstanding requests.
_dao.getProcess().removeRoutes(timerResponseChannel, _dao);
- _outstandingRequests.cancel(timerResponseChannel);
+ getORM().cancel(timerResponseChannel);
// Ignore timer events after the process is finished.
if (ProcessState.isFinished(_dao.getState())) {
@@ -1070,7 +992,7 @@
// receive/reply association.
final String id = timerResponseChannel.export();
_dao.getProcess().removeRoutes(id, _dao);
- _outstandingRequests.cancel(id);
+ getORM().cancel(id);
_vpu.inject(new JacobRunnable() {
private static final long serialVersionUID = 6157913683737696396L;
@@ -1118,7 +1040,7 @@
MessageExchange.Status status = MessageExchange.Status.valueOf(mex.getStatus());
- switch (status) {
+ switch (mex.getAckType()) {
case FAULT:
evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAULT);
responseChannel.onFault();
@@ -1184,7 +1106,7 @@
*
*/
private void cleanupOutstandingMyRoleExchanges(FaultData optionalFaultData) {
- String[] mexRefs = _outstandingRequests.releaseAll();
+ String[] mexRefs = getORM().releaseAll();
for (String mexId : mexRefs) {
MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
if (mexDao != null) {
@@ -1192,20 +1114,26 @@
MessageExchangePattern pattern = MessageExchange.MessageExchangePattern.valueOf(mexDao.getPattern());
InvocationStyle istyle = InvocationStyle.valueOf(mexDao.getInvocationStyle());
if (pattern == MessageExchangePattern.REQUEST_ONLY) {
- mexDao.setStatus(Status.COMPLETED_OK.toString());
+ mexDao.setAckType(AckType.ONEWAY);
+ mexDao.setStatus(Status.COMPLETED.toString());
continue;
}
+ mexDao.setAckType(AckType.FAILURE);
mexDao.setFailureType(FailureType.NO_RESPONSE.toString());
if (optionalFaultData != null) {
mexDao.setFaultExplanation(optionalFaultData.toString());
}
mexDao.setFaultExplanation("Process completed without responding.");
- doMyRoleResponse(mexDao, status, Status.FAILURE);
+ doMyRoleResponse(mexDao, status, Status.ACK);
}
}
}
+ private OutstandingRequestManager getORM() {
+ return (OutstandingRequestManager) _soup.getGlobalData();
+ }
+
private void cleanupOutstandingMyRoleExchanges() {
cleanupOutstandingMyRoleExchanges(null);
}
@@ -1230,27 +1158,16 @@
throw new BpelEngineException(msg);
}
- MessageExchange.Status status = MessageExchange.Status.valueOf(dao.getStatus());
- switch (status) {
- case ASYNC:
- case REQUEST:
- MessageDAO request = dao.getRequest();
- if (request == null) {
- // this also should not happen
- String msg = "Engine requested request for message exchange that did not have one: " + mexId;
- __log.fatal(msg);
- throw new BpelEngineException(msg);
- }
-
- return request.getData();
-
- default:
- // We should not be in any other state when requesting this.
- String msg = "Engine requested response while the message exchange " + mexId + " was in the state " + status;
+ MessageDAO request = dao.getRequest();
+ if (request == null) {
+ // this also should not happen
+ String msg = "Engine requested request for message exchange that did not have one: " + mexId;
__log.fatal(msg);
throw new BpelEngineException(msg);
}
+ return request.getData();
+
}
public QName getPartnerFault(String mexId) {
@@ -1284,9 +1201,7 @@
MessageDAO response;
MessageExchange.Status status = MessageExchange.Status.valueOf(dao.getStatus());
- switch (status) {
- case FAULT:
- case RESPONSE:
+ if (status == Status.ACK) {
response = dao.getResponse();
if (response == null) {
// this also should not happen
@@ -1294,8 +1209,7 @@
__log.fatal(msg);
throw new BpelEngineException(msg);
}
- break;
- default:
+ } else {
// We should not be in any other state when requesting this.
String msg = "Engine requested response while the message exchange " + mexId + " was in the state " + status;
__log.fatal(msg);
@@ -1441,7 +1355,6 @@
}
}
-
private void scheduleReliableResponse(MessageExchangeDAO messageExchange) {
assert !_bpelProcess.isInMemory() : "Internal error; attempt to schedule in-memory process";
assert _contexts.isTransacted();
@@ -1464,51 +1377,4 @@
}
- /**
- * Runnable that actually performs UNRELIABLE invokes on the partner.
- *
- * @author Maciej Szefler <mszefler at gmail dot com>
- *
- */
- class UnreliableInvoker implements Runnable {
-
- UnreliablePartnerRoleMessageExchangeImpl _blockingMex;
-
- public UnreliableInvoker(UnreliablePartnerRoleMessageExchangeImpl blockingMex) {
- _blockingMex = blockingMex;
- }
-
- public void run() {
- assert !_contexts.isTransacted();
-
- // TODO: what happens if system fails right here? we'll need to add a "retry" possibility
-
- Runnable prc;
- try {
- _contexts.mexContext.invokePartnerBlocking(_blockingMex);
- prc = new PartnerResponseContinuation(_blockingMex);
- } catch (Exception ce) {
- prc = new PartnerResponseContinuation(_blockingMex);
- }
-
- // Keep using the same thread to do the work, but note we need to run this in a transaction.
- _instanceWorker.enqueue(_bpelProcess._server.new TransactedRunnable(prc));
- }
-
- }
-
-
- class PartnerResponseContinuation implements Runnable {
-
- private UnreliablePartnerRoleMessageExchangeImpl _mex;
-
- public PartnerResponseContinuation(UnreliablePartnerRoleMessageExchangeImpl blockingMex) {
- _mex = blockingMex;
- }
-
- public void run() {
-
- }
-
- }
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Mon Aug 6 13:47:58 2007
@@ -37,6 +37,7 @@
import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.utils.msg.MessageBundle;
import org.w3c.dom.Element;
@@ -129,14 +130,16 @@
private Set<String> _propNames;
+ private AckType _ackType;
+
public MessageExchangeImpl(
BpelProcess process,
- String mexId,
+ Long iid,
+ String mexId,
OPartnerLink oplink,
- PortType ptype,
- Operation operation) {
+ PortType ptype, Operation operation) {
_process = process;
_contexts = process._contexts;
_mexId = mexId;
@@ -150,11 +153,14 @@
return _mexId.equals(((MessageExchangeImpl)other)._mexId);
}
+ Long getIID() {
+ return _iid;
+ }
void load(MessageExchangeDAO dao) {
_timeout = dao.getTimeout();
_iid = dao.getInstance() != null ? dao.getInstance().getInstanceId() : null;
-
+ _ackType = dao.getAckType();
if (_fault == null)
_fault = dao.getFault();
if (_explanation == null)
@@ -163,7 +169,7 @@
_status = Status.valueOf(dao.getStatus());
}
- public void save(MessageExchangeDAO dao) {
+ void save(MessageExchangeDAO dao) {
dao.setPartnerLinkModelId(_oplink.getId());
dao.setOperation(_operation.getName());
dao.setStatus(_status.toString());
@@ -172,7 +178,7 @@
dao.setFaultExplanation(_explanation);
dao.setTimeout(_timeout);
dao.setFailureType(_failureType == null ? null : _failureType.toString());
-
+ dao.setAckType(_ackType);
if (_changes.contains(Change.REQUEST)) {
MessageDAO requestDao = dao.createMessage(_request.getType());
@@ -260,6 +266,10 @@
return _status;
}
+ public AckType getAckType() {
+ return _ackType;
+ }
+
public Operation getOperation() {
return _operation;
}
@@ -299,9 +309,13 @@
}
-
- void setStatus(Status status) {
- _status = status;
+ void request() {
+ _status = Status.REQ;
+ }
+
+ void ack(AckType ackType) {
+ _status = Status.ACK;
+ _ackType = ackType;
}
public Message createMessage(javax.xml.namespace.QName msgType) {
Added: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java?view=auto&rev=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java (added)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java Mon Aug 6 13:47:58 2007
@@ -0,0 +1,46 @@
+package org.apache.ode.bpel.engine;
+
+import javax.xml.namespace.QName;
+
+import org.apache.ode.bpel.dao.MessageDAO;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.w3c.dom.Element;
+
+/**
+ * Some handy utilities methods for dealing with MEX daos.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+class MexDaoUtil {
+
+ static void setFailed(MessageExchangeDAO mex, FailureType ftype, String explanation) {
+ mex.setStatus(Status.ACK.toString());
+ mex.setAckType(AckType.FAILURE);
+ mex.setFailureType(ftype.toString());
+ mex.setFaultExplanation(explanation);
+ }
+
+ static void setFaulted(MessageExchangeDAO mex, QName faultType, Element faultmsg) {
+ mex.setStatus(Status.ACK.toString());
+ mex.setAckType(AckType.FAULT);
+ mex.setFailureType(null);
+ mex.setFault(faultType);
+ MessageDAO flt = mex.createMessage(faultType);
+ flt.setData(faultmsg);
+ mex.setResponse(flt);
+ }
+
+ static void setResponse(MessageExchangeDAO mex, Element response) {
+ mex.setStatus(Status.ACK.toString());
+ mex.setAckType(AckType.RESPONSE);
+ mex.setFailureType(null);
+ mex.setFault(null);
+ MessageDAO resp = mex.createMessage(null);
+ resp.setData(response);
+ mex.setResponse(resp);
+ }
+}
Propchange: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MexDaoUtil.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java?view=auto&rev=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java (added)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java Mon Aug 6 13:47:58 2007
@@ -0,0 +1,70 @@
+package org.apache.ode.bpel.engine;
+
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * Manage {@link MyRoleMessageExchangeImpl} object references.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+class MyRoleMessageExchangeCache {
+
+ private static final int CLEANUP_PERIOD = 20;
+
+ private HashMap<String, WeakReference<MyRoleMessageExchangeImpl>> _cache;
+
+ private int _inserts = 0;
+
+ void put(MyRoleMessageExchangeImpl mex) {
+ synchronized (this) {
+ ++_inserts;
+ if (_inserts > CLEANUP_PERIOD) {
+ cleanup();
+ }
+
+ WeakReference<MyRoleMessageExchangeImpl> ref = _cache.get(mex.getMessageExchangeId());
+ if (ref != null && ref.get() != null)
+ throw new IllegalStateException("InternalError: duplicate myrolemex registration!");
+
+ _cache.put(mex.getMessageExchangeId(), new WeakReference<MyRoleMessageExchangeImpl>(mex));
+ }
+ }
+
+ /**
+ * Attempt to retrieve a {@link MyRoleMessageExchangeImpl} for the given identifier.
+ * @param mexId
+ * @return
+ */
+ MyRoleMessageExchangeImpl get(String mexId) {
+ synchronized(this) {
+ WeakReference<MyRoleMessageExchangeImpl> ref = _cache.get(mexId);
+ if (ref == null)
+ return null;
+ MyRoleMessageExchangeImpl mex = ref.get();
+ if (mex == null)
+ _cache.remove(mexId);
+ return mex;
+
+ }
+
+ }
+
+ /**
+ * Remove stale references.
+ *
+ */
+ void cleanup() {
+ synchronized(this){
+ for (Iterator<WeakReference<MyRoleMessageExchangeImpl>> i = _cache.values().iterator(); i.hasNext(); ) {
+ WeakReference<MyRoleMessageExchangeImpl> ref = i.next();
+ if (ref.get() == null)
+ i.remove();
+ }
+
+ _inserts = 0;
+ }
+ }
+}
Propchange: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeCache.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Mon Aug 6 13:47:58 2007
@@ -33,14 +33,14 @@
protected String _clientId;
public MyRoleMessageExchangeImpl(BpelProcess process, String mexId, OPartnerLink oplink, Operation operation, QName callee) {
- super(process, mexId, oplink, oplink.myRolePortType, operation);
+ super(process, null, mexId, oplink, oplink.myRolePortType, operation);
_callee = callee;
}
public CorrelationStatus getCorrelationStatus() {
return _cstatus;
}
-
+
@Override
void load(MessageExchangeDAO dao) {
super.load(dao);
@@ -116,7 +116,6 @@
we1.setProcessId(_process.getPID());
we1.setMexId(_mexId);
- setStatus(Status.ASYNC);
_contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
_contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
@@ -160,40 +159,41 @@
protected void onStateChanged(MessageExchangeDAO mexdao, Status oldstatus, final Status newstatus) {
MessageDAO response = mexdao.getResponse();
- switch (newstatus) {
- case RESPONSE: {
- final Element msg = response.getData();
- final QName msgtype = response.getType();
- _process.scheduleRunnable(new Runnable() {
- public void run() {
- serverResponded(new MemBackedMessageImpl(msg, msgtype, true));
- }
- });
- }
- break;
- case FAULT: {
- final QName fault = mexdao.getFault();
- final Element faultMsg = response.getData();
- final QName msgtype = response.getType();
- _process.scheduleRunnable(new Runnable() {
- public void run() {
- serverFaulted(fault, new MemBackedMessageImpl(faultMsg, msgtype, true));
- }
-
- });
- }
- break;
- case FAILURE:
- final String failureExplanation = mexdao.getFaultExplanation();
- final FailureType ftype = FailureType.valueOf(mexdao.getFailureType());
- _process.scheduleRunnable(new Runnable() {
- public void run() {
- serverFailed(ftype, failureExplanation, null); // TODO add failure detail
- }
-
- });
- break;
- }
+ if (newstatus == Status.ACK)
+ switch (mexdao.getAckType()) {
+ case RESPONSE: {
+ final Element msg = response.getData();
+ final QName msgtype = response.getType();
+ _process.scheduleRunnable(new Runnable() {
+ public void run() {
+ serverResponded(new MemBackedMessageImpl(msg, msgtype, true));
+ }
+ });
+ }
+ break;
+ case FAULT: {
+ final QName fault = mexdao.getFault();
+ final Element faultMsg = response.getData();
+ final QName msgtype = response.getType();
+ _process.scheduleRunnable(new Runnable() {
+ public void run() {
+ serverFaulted(fault, new MemBackedMessageImpl(faultMsg, msgtype, true));
+ }
+
+ });
+ }
+ break;
+ case FAILURE:
+ final String failureExplanation = mexdao.getFaultExplanation();
+ final FailureType ftype = FailureType.valueOf(mexdao.getFailureType());
+ _process.scheduleRunnable(new Runnable() {
+ public void run() {
+ serverFailed(ftype, failureExplanation, null); // TODO add failure detail
+ }
+
+ });
+ break;
+ }
}
protected void finalize() {
@@ -204,7 +204,7 @@
void serverFaulted(QName faultType, Message outputFaultMessage) throws BpelEngineException {
_fault = faultType;
_response = (MessageImpl) outputFaultMessage;
- setStatus(Status.FAULT);
+ ack(AckType.FAULT);
}
@@ -213,15 +213,14 @@
_explanation = null;
_response = (MessageImpl) outputMessage;
_response.makeReadOnly();
- setStatus(Status.RESPONSE);
+ ack(AckType.RESPONSE);
}
void serverFailed(FailureType type, String reason, Element details) {
_failureType = type;
_explanation = reason;
- setStatus(Status.FAILURE);
-
+ ack(AckType.FAILURE);
}
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Mon Aug 6 13:47:58 2007
@@ -18,6 +18,15 @@
*/
package org.apache.ode.bpel.engine;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import javax.wsdl.Operation;
+import javax.xml.namespace.QName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.common.CorrelationKey;
@@ -32,28 +41,21 @@
import org.apache.ode.bpel.evt.CorrelationNoMatchEvent;
import org.apache.ode.bpel.evt.NewProcessInstanceEvent;
import org.apache.ode.bpel.iapi.Endpoint;
-import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessState;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
-import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.o.OMessageVarType;
import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.bpel.o.OScope;
import org.apache.ode.bpel.runtime.InvalidProcessException;
-import org.apache.ode.bpel.runtime.PROCESS;
import org.apache.ode.utils.ArrayUtils;
import org.apache.ode.utils.ObjectPrinter;
import org.apache.ode.utils.msg.MessageBundle;
import org.w3c.dom.Element;
-import javax.wsdl.Operation;
-import javax.xml.namespace.QName;
-import java.util.*;
-
/**
* @author Matthieu Riou <mriou at apache dot org>
*/
@@ -96,9 +98,7 @@
Operation operation = getMyRoleOperation(mex.getOperation());
if (operation == null) {
__log.error(__msgs.msgUnknownOperation(mex.getOperation(), _plinkDef.myRolePortType.getQName()));
- mex.setStatus(Status.FAILURE.toString());
- mex.setFailureType(MessageExchange.FailureType.UNKNOWN_OPERATION.toString());
- mex.setFaultExplanation(mex.getOperation());
+ MexDaoUtil.setFailed(mex, FailureType.UNKNOWN_OPERATION, mex.getOperation());
return null;
}
@@ -113,9 +113,8 @@
if (isCreateInstnace)
invokeMyRoleCreateInstance(mex, operation, correlatorId, correlator);
else {
- mex.setStatus(Status.FAILURE.toString());
- mex.setFailureType(MessageExchange.FailureType.OTHER.toString());
- mex.setFaultExplanation("Invalid in-memory process: non createInstance operations are not supported!");
+ MexDaoUtil.setFailed(mex, FailureType.OTHER,
+ "Invalid in-memory process: non createInstance operations are not supported!");
return null;
}
@@ -139,10 +138,7 @@
// We'd like to do a graceful exit here, no sense in rolling back due to a
// a message format problem.
__log.debug("Unable to evaluate correlation keys, invalid message format. ", ime);
- mex.setFailureType(MessageExchange.FailureType.FORMAT_ERROR.toString());
- mex.setStatus(Status.FAILURE.toString());
- mex.setFaultExplanation(ime.getMessage());
-
+ MexDaoUtil.setFailed(mex, FailureType.FORMAT_ERROR, ime.getMessage());
return null;
}
@@ -201,8 +197,8 @@
mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED.toString());
mex.setInstance(messageRoute.getTargetInstance());
-
- // We're overloading the channel here to be the PICK response channel + index
+
+ // We're overloading the channel here to be the PICK response channel + index
mex.setChannel(messageRoute.getGroupId() + "&" + messageRoute.getIndex());
} else {
if (__log.isDebugEnabled()) {
@@ -227,15 +223,7 @@
}
}
- // Now we have to update our message exchange status. If the <reply>
- // was not hit during the
- // invocation, then we will be in the "REQUEST" phase which means
- // that either this was a one-way
- // or a two-way that needs to delivery the reply asynchronously.
- if (Status.valueOf(mex.getStatus()) == MessageExchange.Status.REQUEST) {
- mex.setStatus(MessageExchange.Status.ASYNC.toString());
- }
-
+
return CorrelationStatus.valueOf(mex.getCorrelationStatus());
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkPartnerRoleImpl.java Mon Aug 6 13:47:58 2007
@@ -18,14 +18,33 @@
*/
package org.apache.ode.bpel.engine;
+import java.util.Set;
+
+import javax.transaction.Transaction;
+import javax.wsdl.Operation;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.engine.PartnerRoleMessageExchangeImpl.State;
+import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.Endpoint;
+import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.o.OPartnerLink;
+import org.w3c.dom.Element;
/**
+ *
+ * Class providing a lot of the dirty work of IL invokes.
+ *
* @author Matthieu Riou <mriou at apache dot org>
+ * @author Maciej Szefler <mszefler at gmail dot com>
*/
class PartnerLinkPartnerRoleImpl extends PartnerLinkRoleImpl {
static final Log __log = LogFactory.getLog(BpelProcess.class);
@@ -39,4 +58,297 @@
_initialPartner = initialPartner;
}
+ PartnerRoleMessageExchangeImpl createPartnerRoleMex(MessageExchangeDAO mexdao) {
+ InvocationStyle istyle = InvocationStyle.valueOf(mexdao.getInvocationStyle());
+ PartnerRoleMessageExchangeImpl mex;
+ Operation op = _plinkDef.getPartnerRoleOperation(mexdao.getOperation());
+ EndpointReference myroleEPR = _plinkDef.hasMyRole() ? _process.getInitialMyRoleEPR(_plinkDef) : null;
+ switch (istyle) {
+ case UNRELIABLE:
+ mex = new UnreliablePartnerRoleMessageExchangeImpl(_process, mexdao.getInstance().getInstanceId(), mexdao
+ .getMessageExchangeId(), _plinkDef, op, /* EPR todo */
+ null, myroleEPR, _channel);
+ break;
+ case TRANSACTED:
+ mex = new TransactedPartnerRoleMessageExchangeImpl(_process, mexdao.getInstance().getInstanceId(), mexdao
+ .getMessageExchangeId(), _plinkDef, op, /*
+ * EPR todo
+ */
+ null, myroleEPR, _channel);
+ break;
+ case RELIABLE:
+ mex = new ReliablePartnerRoleMessageExchangeImpl(_process, mexdao.getInstance().getInstanceId(), mexdao
+ .getMessageExchangeId(), _plinkDef, op, null, /* EPR todo */
+ myroleEPR, _channel);
+ break;
+
+ default:
+ throw new BpelEngineException("Unexpected InvocationStyle: " + istyle);
+
+ }
+
+ mex.load(mexdao);
+ return mex;
+
+ }
+
+ /**
+ * Invoke a partner through the integration layer.
+ *
+ * @param mexDao
+ */
+ void invokeIL(MessageExchangeDAO mexDao) {
+
+ Element partnerEprXml = mexDao.getEPR();
+ EndpointReference partnerEpr = partnerEprXml == null ? _initialEPR : _contexts.eprContext
+ .resolveEndpointReference(partnerEprXml);
+ EndpointReference myRoleEpr = null; // TODO: fix?
+ Operation operation = _plinkDef.getPartnerRoleOperation(mexDao.getOperation());
+ Set<InvocationStyle> supportedStyles = _contexts.mexContext.getSupportedInvocationStyle(_channel, partnerEpr);
+
+ boolean oneway = MessageExchangePattern.valueOf(mexDao.getPattern()) == MessageExchangePattern.REQUEST_ONLY;
+
+ if (_process.isInMemory()) {
+ invokeInMem(mexDao, partnerEpr, myRoleEpr, operation, supportedStyles, oneway);
+ } else {
+ invokePersisted(mexDao, partnerEpr, myRoleEpr, operation, supportedStyles);
+ }
+
+ }
+
+ private void invokePersisted(MessageExchangeDAO mexDao, EndpointReference partnerEpr, EndpointReference myRoleEpr,
+ Operation operation, Set<InvocationStyle> supportedStyles) {
+ if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+ invokeTransacted(mexDao, partnerEpr, myRoleEpr, operation);
+ } else if (supportedStyles.contains(InvocationStyle.RELIABLE)) {
+ invokeReliable(mexDao, partnerEpr, myRoleEpr, operation);
+ } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
+ invokeUnreliable(mexDao, partnerEpr, myRoleEpr, operation);
+ } else {
+ // This really should not happen, indicates IL is screwy.
+ __log.error("Integration Layer did not agree to any known invocation style for EPR " + partnerEpr);
+ mexDao.setFailureType(FailureType.COMMUNICATION_ERROR.toString());
+ mexDao.setStatus(Status.ACK.toString());
+ mexDao.setAckType(AckType.FAILURE);
+ mexDao.setFaultExplanation("NoMatchingStyle");
+ }
+ }
+
+ private void invokeUnreliable(MessageExchangeDAO mexDao, EndpointReference partnerEpr, EndpointReference myRoleEpr,
+ Operation operation) {
+ // For BLOCKING invocation, we defer the call until after commit (unless idempotent).
+ UnreliablePartnerRoleMessageExchangeImpl blockingMex = new UnreliablePartnerRoleMessageExchangeImpl(_process, mexDao
+ .getInstance().getInstanceId(), mexDao.getMessageExchangeId(), _plinkDef, operation, partnerEpr, myRoleEpr,
+ _channel);
+ // We schedule in-memory (no db) to guarantee "at most once" semantics.
+ blockingMex.setState(State.INVOKE_XXX);
+ _process.scheduleInstanceWork(mexDao.getInstance().getInstanceId(), new UnreliableInvoker(blockingMex));
+ }
+
+ /**
+ * Invoke an in-memory process. In-memory processes are a bit different, we're never going to do any scheduling for them, so
+ * we'd prefer to have TRANSACTED invocation style. If that is not available we have to fake it.
+ *
+ * @param mexDao
+ * @param partnerEpr
+ * @param myRoleEpr
+ * @param operation
+ * @param supportedStyles
+ * @param oneway
+ */
+ private void invokeInMem(MessageExchangeDAO mexDao, EndpointReference partnerEpr, EndpointReference myRoleEpr,
+ Operation operation, Set<InvocationStyle> supportedStyles, boolean oneway) {
+ // In-memory processes are a bit different, we're never going to do any scheduling for them, so we'd
+ // prefer to have TRANSACTED invocation style.
+ if (supportedStyles.contains(InvocationStyle.TRANSACTED)) {
+ invokeTransacted(mexDao, partnerEpr, myRoleEpr, operation);
+ } else if (supportedStyles.contains(InvocationStyle.RELIABLE) && oneway) {
+ invokeReliable(mexDao, partnerEpr, myRoleEpr, operation);
+ } else if (supportedStyles.contains(InvocationStyle.UNRELIABLE)) {
+ UnreliablePartnerRoleMessageExchangeImpl unreliableMex = new UnreliablePartnerRoleMessageExchangeImpl(_process, mexDao
+ .getInstance().getInstanceId(), mexDao.getMessageExchangeId(), _plinkDef, operation, partnerEpr, myRoleEpr,
+ _channel);
+
+ // Need to cheat a little bit for in-memory processes; do the invoke in-line, but first suspend
+ // the transaction so that the IL does not get confused.
+ Transaction tx;
+ try {
+ tx = _contexts.txManager.suspend();
+ } catch (Exception ex) {
+ throw new BpelEngineException("TxManager Error: cannot suspend!", ex);
+ }
+
+ try {
+ unreliableMex.setState(State.INVOKE_XXX);
+ _contexts.mexContext.invokePartnerBlocking(unreliableMex);
+ try {
+ unreliableMex.waitForAck(mexDao.getTimeout());
+ } catch (InterruptedException ie) {
+ ;
+ ; // ignore
+ }
+
+ } finally {
+ unreliableMex.setState(State.DEAD);
+ try {
+ _contexts.txManager.resume(tx);
+ } catch (Exception e) {
+ throw new BpelEngineException("TxManager Error: cannot resume!", e);
+ }
+ }
+
+ if (unreliableMex.getStatus() != Status.ACK) {
+ MexDaoUtil.setFailed(mexDao, FailureType.NO_RESPONSE, "No Response");
+ } else {
+ unreliableMex.save(mexDao);
+ }
+ } else /* non-supported in-mem style */{
+ MexDaoUtil.setFailed(mexDao, FailureType.OTHER, "Unsupported invocation style for in-mem process.");
+ }
+ }
+
+ private void invokeReliable(MessageExchangeDAO mexDao, EndpointReference partnerEpr, EndpointReference myRoleEpr,
+ Operation operation) {
+ // We can do RELIABLE for in-mem, but only if they are one way.
+ ReliablePartnerRoleMessageExchangeImpl reliableMex = new ReliablePartnerRoleMessageExchangeImpl(_process, mexDao
+ .getInstance().getInstanceId(), mexDao.getMessageExchangeId(), _plinkDef, operation, partnerEpr, myRoleEpr,
+ _channel);
+ reliableMex.setState(State.INVOKE_XXX);
+ Throwable err = null;
+ try {
+ _contexts.mexContext.invokePartnerReliable(reliableMex);
+ } catch (Throwable t) {
+ err = t;
+ }
+
+ reliableMex.setState(State.HOLD);
+
+ if (err != null) {
+ MexDaoUtil.setFailed(mexDao,FailureType.COMMUNICATION_ERROR, err.toString());
+ reliableMex.setState(State.DEAD);
+ } else {
+ if (reliableMex.getStatus() == Status.ACK) {
+ reliableMex.save(mexDao);
+ reliableMex.setState(State.DEAD);
+ } else
+ reliableMex.setState(State.ASYNC);
+ }
+
+ }
+
+ private void invokeTransacted(MessageExchangeDAO mexDao, EndpointReference partnerEpr, EndpointReference myRoleEpr,
+ Operation operation) {
+ // If TRANSACTED is supported, this is again easy, do it in-line.
+ TransactedPartnerRoleMessageExchangeImpl transactedMex = new TransactedPartnerRoleMessageExchangeImpl(_process, mexDao
+ .getInstance().getInstanceId(), mexDao.getMessageExchangeId(), _plinkDef, operation, partnerEpr, myRoleEpr,
+ _channel);
+ transactedMex.setState(State.INVOKE_XXX);
+ try {
+ _contexts.mexContext.invokePartnerTransacted(transactedMex);
+ } catch (Throwable t) {
+ __log.error("Transacted partner invoke threw an exception; rolling back.");
+ try {
+ _contexts.txManager.setRollbackOnly();
+ } catch (Exception ex) {
+ __log.fatal("TransactionManager error, could not setRollbackOnly()",ex);
+ }
+ throw new BpelEngineException("Rollback required.",t);
+ } finally {
+ transactedMex.setState(State.DEAD);
+ }
+
+ if (transactedMex.getStatus() != Status.ACK) {
+ MexDaoUtil.setFailed(mexDao, FailureType.NO_RESPONSE, "Integration Layer did not provide required ACK.");
+ } else {
+ transactedMex.save(mexDao);
+ }
+
+ }
+
+ /**
+ * Runnable that actually performs UNRELIABLE invokes on the partner.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+ class UnreliableInvoker implements Runnable {
+
+ UnreliablePartnerRoleMessageExchangeImpl _unreliableMex;
+
+ BpelInstanceWorker _iworker;
+
+ /** Keep a copy of the last BpelRuntimeContextImpl; this is used to optimize away a DB read. */
+ BpelRuntimeContextImpl _lastBRC;
+
+ /**
+ *
+ * @param blockingMex
+ * the MEX we're invoking on the partner
+ * @param iworker
+ * instance worker (for scheduling continuation)
+ * @param lastBpelRuntimeContextImpl
+ * the BRC that initiated this invoke
+ */
+ public UnreliableInvoker(UnreliablePartnerRoleMessageExchangeImpl blockingMex) {
+ _unreliableMex = blockingMex;
+ }
+
+ public void run() {
+ assert !_contexts.isTransacted();
+
+ // Do the unreliable invoke (outside of tx context). A system failure here will result in the mex going
+ // into an unknown state requiring manual intervention.
+ Throwable err = null;
+ Status status;
+ _unreliableMex.setState(State.INVOKE_XXX);
+ try {
+ _contexts.mexContext.invokePartnerBlocking(_unreliableMex);
+ _unreliableMex.setState(State.HOLD);
+ } catch (Throwable t) {
+ _unreliableMex.setState(State.DEAD);
+ err = t;
+ }
+
+ final Throwable ferr = err;
+
+ // We proceed handling the response in a transaction. Note that if for some reason the following transaction
+ // fails, the unreliable invoke will be in an "unknown" state, and will require manual intervention to either
+ // retry or force fail.
+ try {
+
+ _contexts.execTransaction(new Runnable() {
+ public void run() {
+
+ MessageExchangeDAO mexdao = _process.loadMexDao(_unreliableMex.getMessageExchangeId());
+ if (ferr != null) {
+ MexDaoUtil.setFailed(mexdao, FailureType.OTHER, ferr.toString());
+ _unreliableMex.setState(State.DEAD);
+ } else if (_unreliableMex.getStatus() == Status.ACK) {
+ _unreliableMex.save(mexdao);
+ _unreliableMex.setState(State.DEAD);
+ } else if (_unreliableMex.getStatus() == Status.REQ && !_unreliableMex._asyncReply) {
+ MexDaoUtil.setFailed(mexdao, FailureType.NO_RESPONSE, "No Response");
+ _unreliableMex.setState(State.DEAD);
+ } else if (_unreliableMex._asyncReply) {
+ _unreliableMex.setState(State.ASYNC);
+ return;
+ } else {
+ // We should have exhausted the possibilities.
+ throw new BpelEngineException("InternalError: Unexpected message exchange state!");
+ }
+
+ _process.executeContinueInstancePartnerRoleResponseReceived(mexdao);
+
+ }
+
+ });
+ } catch (Throwable t) {
+ _unreliableMex.setState(State.DEAD);
+ __log.error("Transaction Failed (TODO!!!!): Need to mark instance for user action", t);
+ // TODO: Schedule something to pick up the job (we cant just retry bc the invoke is complete!
+ }
+
+ }
+
+ }
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java?view=diff&rev=563267&r1=563266&r2=563267
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkRoleImpl.java Mon Aug 6 13:47:58 2007
@@ -28,10 +28,12 @@
protected OPartnerLink _plinkDef;
protected EndpointReference _initialEPR;
protected BpelProcess _process;
+ protected Contexts _contexts;
PartnerLinkRoleImpl(BpelProcess process, OPartnerLink plink) {
_plinkDef = plink;
_process = process;
+ _contexts = _process._contexts;
}
String getPartnerLinkName() {
return _plinkDef.name;