You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/05/15 02:02:05 UTC
svn commit: r656469 - in /ode/trunk:
axis2-war/src/test/java/org/apache/ode/axis2/
bpel-runtime/src/main/java/org/apache/ode/bpel/engine/
Author: mriou
Date: Wed May 14 17:02:04 2008
New Revision: 656469
URL: http://svn.apache.org/viewvc?rev=656469&view=rev
Log:
Fixed req-only P2P with a semantic closer to reliable invoke.
Modified:
ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Modified: ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java
URL: http://svn.apache.org/viewvc/ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java?rev=656469&r1=656468&r2=656469&view=diff
==============================================================================
--- ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java (original)
+++ ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java Wed May 14 17:02:04 2008
@@ -31,6 +31,7 @@
String response = server.sendRequestFile("http://localhost:8080/ode/processes/MSMainExecuteService",
bundleName, "testRequest.soap");
+ System.out.println("->" + response);
assertTrue(response.indexOf("OK") > 0);
} finally {
server.undeployProcess(bundleName);
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=656469&r1=656468&r2=656469&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed May 14 17:02:04 2008
@@ -293,7 +293,7 @@
// conditions for deadlock in the correlation tables. However if invocation style is transacted,
// we need to do the work right then and there.
- if (istyle == InvocationStyle.TRANSACTED || istyle == InvocationStyle.P2P) {
+ if (istyle == InvocationStyle.TRANSACTED) {
doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
public Void call() {
executeContinueInstanceMyRoleRequestReceived(mexdao);
@@ -1021,15 +1021,13 @@
* @param runnable
*/
void scheduleRunnable(final Runnable runnable) {
- if (__log.isDebugEnabled())
- __log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
+ if (__log.isDebugEnabled()) __log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
_server.scheduleRunnable(new ProcessRunnable(runnable));
}
void enqueueRunnable(BpelInstanceWorker worker) {
- if (__log.isDebugEnabled())
- __log.debug("enqueuRunnable for process " + _pid + ": " + worker);
+ if (__log.isDebugEnabled()) __log.debug("enqueuRunnable for process " + _pid + ": " + worker);
_server.enqueueRunnable(new ProcessRunnable(worker));
}
@@ -1040,7 +1038,6 @@
final String mexId = new GUID().toString();
_hydrationLatch.latch(1);
try {
-
final PartnerLinkMyRoleImpl target = getPartnerLinkForService(targetService);
if (target == null)
throw new BpelEngineException("NoSuchService: " + targetService);
@@ -1056,31 +1053,21 @@
}
void onMyRoleMexAck(MessageExchangeDAO mexdao, Status old) {
-
if (mexdao.getPipedMessageExchangeId() != null) /* p2p */{
-
BpelProcess caller = _server.getBpelProcess(mexdao.getPipedPID());
- if (caller == null) {
- // process no longer deployed....
-
- return;
- }
+ // process no longer deployed....
+ if (caller == null) return;
MessageExchangeDAO pmex = caller.loadMexDao(mexdao.getPipedMessageExchangeId());
- if (pmex == null) {
- // Mex no longer there.... odd..
-
- return;
- }
+ // Mex no longer there.... odd..
+ if (pmex == null) return;
// Need to copy the response and state from myrolemex --> partnerrolemex
-
boolean compat = !(caller.isInMemory() ^ isInMemory());
if (compat) {
// both processes are in-mem or both are persisted, can share the message
pmex.setResponse(mexdao.getResponse());
} else /* one process in-mem, other persisted */{
-
MessageDAO presponse = pmex.createMessage(mexdao.getResponse().getType());
presponse.setData(mexdao.getResponse().getData());
pmex.setResponse(presponse);
@@ -1089,9 +1076,7 @@
pmex.setAckType(mexdao.getAckType());
pmex.setFailureType(mexdao.getFailureType());
- if (old == Status.ASYNC)
- caller.p2pWakeup(pmex);
-
+ if (old == Status.ASYNC) caller.p2pWakeup(pmex);
} else /* not p2p */{
// Do an Async wakeup if we are in the ASYNC state. If we're not, we'll pick up the ACK when we unwind
// the stack.
@@ -1105,9 +1090,7 @@
__log.error("Integration layer threw an unexepcted exception.", t);
}
}
-
}
-
}
class ProcessRunnable implements Runnable {
@@ -1116,7 +1099,6 @@
ProcessRunnable(Runnable work) {
_work = work;
}
-
public void run() {
_hydrationLatch.latch(1);
try {
@@ -1124,9 +1106,7 @@
} finally {
_hydrationLatch.release(1);
}
-
}
-
}
class ProcessCallable<T> implements Callable<T> {
@@ -1321,14 +1301,13 @@
* @param partnerRoleMex
*/
private void invokeP2P(BpelProcess target, QName serviceName, Operation operation, MessageExchangeDAO partnerRoleMex) {
- if (BpelProcess.__log.isDebugEnabled()) {
- __log
- .debug("Invoking in a p2p interaction, partnerrole " + partnerRoleMex.getMessageExchangeId() + " target="
- + target);
- }
+ if (BpelProcess.__log.isDebugEnabled())
+ __log.debug("Invoking in a p2p interaction, partnerrole " + partnerRoleMex.getMessageExchangeId()
+ + " target=" + target);
partnerRoleMex.setInvocationStyle(InvocationStyle.P2P);
+ // Plumbing
MessageExchangeDAO myRoleMex = target.createMessageExchange(new GUID().toString(),
MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
myRoleMex.setStatus(Status.REQ);
@@ -1350,10 +1329,9 @@
String mySessionId = partnerRoleMex.getPartnerLink().getMySessionId();
String partnerSessionId = partnerRoleMex.getPartnerLink().getPartnerSessionId();
- if (BpelProcess.__log.isDebugEnabled()) {
- __log.debug("Setting myRoleMex session ids for p2p interaction, mySession " + partnerSessionId + " - partnerSess "
- + mySessionId);
- }
+ if (BpelProcess.__log.isDebugEnabled())
+ __log.debug("Setting myRoleMex session ids for p2p interaction, mySession " + partnerSessionId
+ + " - partnerSess " + mySessionId);
if (mySessionId != null)
partnerRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId);
@@ -1368,8 +1346,10 @@
if (__log.isDebugEnabled())
__log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
+ // A classic P2P interaction is considered reliable. The invocation should take place
+ // in the local transaction but the invoked process is not supposed to hold our thread
+ // and the reply should come in a separate transaction.
target.invokeProcess(myRoleMex);
-
}
/**
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=656469&r1=656468&r2=656469&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed May 14 17:02:04 2008
@@ -710,7 +710,7 @@
// we need to inject a message on the response channel, so that the process continues.
switch (mexDao.getStatus()) {
case ACK:
- injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
+ if (mexDao.getChannel() != null) injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
break;
case ASYNC:
// we'll have to wait for the response.