You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/08/08 19:52:39 UTC
svn commit: r563962 - in
/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine:
BpelProcess.java BpelRuntimeContextImpl.java MyRoleMessageExchangeImpl.java
UnreliableMyRoleMessageExchangeImpl.java WorkEvent.java
Author: mszefler
Date: Wed Aug 8 10:52:37 2007
New Revision: 563962
URL: http://svn.apache.org/viewvc?view=rev&rev=563962
Log:
BART: tweak myrole invokes to do routing at the time of invokeXXX call.
BART: rename some methods in BRC for consistency/clarity.
Modified:
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?view=diff&rev=563962&r1=563961&r2=563962
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed Aug 8 10:52:37 2007
@@ -38,6 +38,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.CorrelationKey;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
@@ -188,8 +189,8 @@
_hydrationLatch.latch(1);
try {
- // The following check is mostly for sanity purposes. MexImpls should prevent this from
- // happening.
+ // The following check is mostly for sanity purposes. MexImpls should prevent this from
+ // happening.
PartnerLinkMyRoleImpl target = getMyRoleForService(mexdao.getCallee());
Status oldstatus = Status.valueOf(mexdao.getStatus());
if (target == null) {
@@ -215,13 +216,13 @@
return;
}
- // "Acknowledge" any one-way invokes
+ // "Acknowledge" any one-way invokes
if (op.getOutput() == null) {
mexdao.setStatus(Status.ACK.toString());
mexdao.setAckType(AckType.ONEWAY);
fireMexStateEvent(mexdao, oldstatus, Status.ACK);
}
-
+
mexdao.setProcess(getProcessDAO());
// TODO: fix this
@@ -243,13 +244,32 @@
});
} else if (cstatus == CorrelationStatus.MATCHED) {
- doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
- public Void call() {
- executeContinueInstanceMyRoleRequestReceived(mexdao);
- return null;
- }
- });
-
+ // This should not occur for in-memory processes, since they are technically not allowed to
+ // have any <receive>/<pick> elements that are not start activities.
+ if (isInMemory())
+ __log.warn("In-memory process " + _pid + " is participating in a non-createinstance exchange!");
+
+
+ // We don't like to do the work in the same TX that did the matching, since this creates fertile
+ // conditions for deadlock in the correlation tables. However if invocation style is transacted,
+ // we need to do the work right then and there.
+
+ if (istyle == InvocationStyle.TRANSACTED)
+ doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
+ public Void call() {
+ executeContinueInstanceMyRoleRequestReceived(mexdao);
+ return null;
+ }
+ });
+ else /* non-transacted style */ {
+ WorkEvent we = new WorkEvent();
+ we.setType(WorkEvent.Type.MYROLE_INVOKE);
+ we.setIID(mexdao.getInstance().getInstanceId());
+ we.setMexId(mexdao.getMessageExchangeId());
+ we.setProcessId(_pid);
+
+ scheduleWorkEvent(we, null);
+ }
} else if (cstatus == CorrelationStatus.QUEUED) {
; // do nothing
}
@@ -280,7 +300,7 @@
int amp = mexdao.getChannel().indexOf('&');
String groupId = mexdao.getChannel().substring(0, amp);
int idx = Integer.valueOf(mexdao.getChannel().substring(amp + 1));
- instance.inputMsgMatch(groupId, idx, mexdao);
+ instance.injectMyRoleMessageExchange(groupId, idx, mexdao);
instance.execute();
}
@@ -295,6 +315,46 @@
brc.execute();
}
+
+ void executeContinueInstanceResume(ProcessInstanceDAO instanceDao) {
+ BpelInstanceWorker worker = _instanceWorkerCache.get(instanceDao.getInstanceId());
+ assert worker.isWorkerThread();
+
+ BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDao);
+ brc.execute();
+
+ }
+
+ void executeContinueInstanceTimerReceived(ProcessInstanceDAO instanceDao, String timerChannel) {
+ BpelInstanceWorker worker = _instanceWorkerCache.get(instanceDao.getInstanceId());
+ assert worker.isWorkerThread();
+
+ BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDao);
+ if (brc.injectTimerEvent(timerChannel))
+ brc.execute();
+
+ }
+
+ private void executeContinueInstanceMatcherEvent(ProcessInstanceDAO instanceDao, String correlatorId, CorrelationKey correlationKey) {
+ BpelInstanceWorker worker = _instanceWorkerCache.get(instanceDao.getInstanceId());
+ assert worker.isWorkerThread();
+
+ BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDao);
+ if(brc.matcherEvent(correlatorId, correlationKey))
+ brc.execute();
+
+ }
+
+ private void executeContinueInstancePartnerResponse(MessageExchangeDAO mexDao, String channel) {
+ BpelInstanceWorker worker = _instanceWorkerCache.get(mexDao.getInstance().getInstanceId());
+ assert worker.isWorkerThread();
+
+ BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, mexDao.getInstance());
+ brc.injectPartnerResponse(mexDao.getMessageExchangeId(), channel);
+ brc.execute();
+
+ }
+
private void enqueueInstanceWork(Long instanceId, Runnable runnable) {
BpelInstanceWorker iworker = _instanceWorkerCache.get(instanceId);
iworker.enqueue(runnable);
@@ -428,31 +488,13 @@
__log.debug(ObjectPrinter.stringifyMethodEnter("handleWorkEvent", new Object[] { "jobInfo", jobInfo }));
}
- // Process-level events
- if (we.getType().equals(WorkEvent.Type.MYROLE_INVOKE)) {
- // second stage of my-role invoke for BLOCKING/ASYNC/RELIABLE invocation style.
- if (__log.isDebugEnabled()) {
- __log.debug("InvokeInternal event for mexid " + we.getMexId());
+ enqueueInstanceTransaction(we.getIID(), new Runnable() {
+ public void run() {
+ _contexts.scheduler.jobCompleted(jobInfo.jobName);
+ execInstanceEvent(we);
}
- enqueueTransaction(new Callable<Void>() {
- public Void call() {
- _contexts.scheduler.jobCompleted(jobInfo.jobName);
- MessageExchangeDAO mexdao = loadMexDao(we.getMexId());
- invokeProcess(mexdao);
- return null;
- }
- });
-
- } else /* instance-level events */{
- enqueueInstanceTransaction(we.getIID(), new Runnable() {
- public void run() {
- _contexts.scheduler.jobCompleted(jobInfo.jobName);
- execInstanceEvent(we);
- }
-
- });
- }
+ });
}
@@ -472,6 +514,8 @@
assert worker.isWorkerThread();
ProcessInstanceDAO instanceDAO = getProcessDAO().getInstance(we.getIID());
+ MessageExchangeDAO mexDao = we.getMexId() == null ? null : loadMexDao(we.getMexId());
+
if (instanceDAO == null) {
if (__log.isDebugEnabled()) {
__log.debug("handleWorkEvent: no ProcessInstance found with iid " + we.getIID() + "; ignoring.");
@@ -479,32 +523,26 @@
return;
}
- BpelRuntimeContextImpl brc = new BpelRuntimeContextImpl(worker, instanceDAO);
+ if (__log.isDebugEnabled()) {
+ __log.debug("handleWorkEvent: " + we.getType() + " event for process instance " + we.getIID());
+ }
+
switch (we.getType()) {
+ case MYROLE_INVOKE:
+ executeContinueInstanceMyRoleRequestReceived(mexDao);
+ break;
case TIMER:
- if (__log.isDebugEnabled()) {
- __log.debug("handleWorkEvent: TimerWork event for process instance " + brc);
- }
- brc.timerEvent(we.getChannel());
+ executeContinueInstanceTimerReceived(instanceDAO, we.getChannel());
break;
case RESUME:
- if (__log.isDebugEnabled()) {
- __log.debug("handleWorkEvent: ResumeWork event for iid " + we.getIID());
- }
- brc.execute();
+ executeContinueInstanceResume(instanceDAO);
break;
case PARTNER_RESPONSE:
- if (__log.isDebugEnabled()) {
- __log.debug("InvokeResponse event for iid " + we.getIID());
- }
- brc.injectPartnerResponse(we.getMexId(), we.getChannel());
- brc.execute();
+ executeContinueInstancePartnerResponse(mexDao, we.getChannel());
break;
case MATCHER:
- if (__log.isDebugEnabled()) {
- __log.debug("Matcher event for iid " + we.getIID());
- }
- brc.matcherEvent(we.getCorrelatorId(), we.getCorrelationKey());
+ executeContinueInstanceMatcherEvent(instanceDAO, we.getCorrelatorId(), we.getCorrelationKey());
+ break;
}
}
@@ -1139,9 +1177,9 @@
}
public void scheduleWorkEvent(WorkEvent we, Date timeToFire) {
-// if (isInMemory())
-// throw new InvalidProcessException("In-mem process execution resulted in event scheduling.");
-
+ // if (isInMemory())
+ // throw new InvalidProcessException("In-mem process execution resulted in event scheduling.");
+
_contexts.scheduler.schedulePersistedJob(we.getDetail(), timeToFire);
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=563962&r1=563961&r2=563962
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed Aug 8 10:52:37 2007
@@ -354,7 +354,7 @@
for (int i = 0; i < correlators.size(); ++i) {
CorrelatorDAO ci = correlators.get(i);
if (ci.equals(_dao.getInstantiatingCorrelator())) {
- inputMsgMatch(pickResponseChannelStr, i, _instantiatingMessageExchange);
+ injectMyRoleMessageExchange(pickResponseChannelStr, i, _instantiatingMessageExchange);
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("SELECT: " + pickResponseChannel + ": FOUND match for NEW instance mexRef="
+ _instantiatingMessageExchange);
@@ -924,7 +924,7 @@
_instanceWorker.setCachedState(newcount, _soup);
}
- void inputMsgMatch(final String responsechannel, final int idx, MessageExchangeDAO mexdao) {
+ void injectMyRoleMessageExchange(final String responsechannel, final int idx, MessageExchangeDAO mexdao) {
// if we have a message match, this instance should be marked
// active if it isn't already
if (_dao.getState() == ProcessState.STATE_READY) {
@@ -954,7 +954,7 @@
});
}
- void timerEvent(final String timerResponseChannel) {
+ boolean injectTimerEvent(final String timerResponseChannel) {
// In case this is a pick event, we remove routes,
// and cancel the outstanding requests.
_dao.getProcess().removeRoutes(timerResponseChannel, _dao);
@@ -962,7 +962,7 @@
// Ignore timer events after the process is finished.
if (ProcessState.isFinished(_dao.getState())) {
- return;
+ return false;
}
_vpu.inject(new JacobRunnable() {
@@ -973,7 +973,8 @@
responseChannel.onTimeout();
}
});
- execute();
+
+ return true;
}
public void cancel(final TimerResponseChannel timerResponseChannel) {
@@ -1308,7 +1309,7 @@
* Attempt to match message exchanges on a correlator.
*
*/
- void matcherEvent(String correlatorId, CorrelationKey ckey) {
+ boolean matcherEvent(String correlatorId, CorrelationKey ckey) {
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckey=" + ckey);
}
@@ -1320,7 +1321,7 @@
if (mroute == null) {
// Ok, this means that a message arrived before we did, so nothing to do.
__log.debug("MatcherEvent handling: nothing to do, route no longer in DB");
- return;
+ return false;
}
// Now see if there is a message that matches this selector.
@@ -1336,12 +1337,14 @@
BpelProcess.__log.debug("SELECT: " + mroute.getGroupId() + ": matched to MESSAGE " + mexdao + " on CKEY " + ckey);
}
- inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mexdao);
- execute();
+ injectMyRoleMessageExchange(mroute.getGroupId(), mroute.getIndex(), mexdao);
+ return true;
} else {
__log.debug("MatcherEvent handling: nothing to do, no matching message in DB");
}
+
+ return false;
}
private void scheduleReliableResponse(MessageExchangeDAO messageExchange) {
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=563962&r1=563961&r2=563962
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java Wed Aug 8 10:52:37 2007
@@ -1,5 +1,6 @@
package org.apache.ode.bpel.engine;
+import java.util.Date;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
@@ -126,14 +127,7 @@
we.setProcessId(_process.getPID());
we.setMexId(_mexId);
- // Schedule a timeout
- final WorkEvent we1 = new WorkEvent();
- we1.setType(WorkEvent.Type.MYROLE_INVOKE_TIMEOUT);
- we1.setProcessId(_process.getPID());
- we1.setMexId(_mexId);
-
_contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
- _contexts.scheduler.schedulePersistedJob(we1.getDetail(), null);
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java?view=diff&rev=563962&r1=563961&r2=563962
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/UnreliableMyRoleMessageExchangeImpl.java Wed Aug 8 10:52:37 2007
@@ -56,10 +56,7 @@
request();
MessageExchangeDAO dao = _process.createMessageExchange(getMessageExchangeId(), MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
save(dao);
- if (_process.isInMemory())
- _process.invokeProcess(dao);
- else
- scheduleInvoke();
+ _process.invokeProcess(dao);
return null;
}
Modified: ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
URL: http://svn.apache.org/viewvc/ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java?view=diff&rev=563962&r1=563961&r2=563962
==============================================================================
--- ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java (original)
+++ ode/branches/bart/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java Wed Aug 8 10:52:37 2007
@@ -59,6 +59,10 @@
return _jobDetail;
}
+ public String toString() {
+ return "WorkEvent" + _jobDetail;
+ }
+
public enum Type {
TIMER,
@@ -72,8 +76,7 @@
/** Invoke a "my role" operation (i.e. implemented by the process). */
MYROLE_INVOKE,
- /** Timer event for "my role" invocations that are taking too long. */
- MYROLE_INVOKE_TIMEOUT, MYROLE_INVOKE_ASYNC_RESPONSE
+ MYROLE_INVOKE_ASYNC_RESPONSE
}
public String getChannel() {