You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2008/05/15 02:02:05 UTC

svn commit: r656469 - in /ode/trunk: axis2-war/src/test/java/org/apache/ode/axis2/ bpel-runtime/src/main/java/org/apache/ode/bpel/engine/

Author: mriou
Date: Wed May 14 17:02:04 2008
New Revision: 656469

URL: http://svn.apache.org/viewvc?rev=656469&view=rev
Log:
Fixed req-only P2P with a semantic closer to reliable invoke.

Modified:
    ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java

Modified: ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java
URL: http://svn.apache.org/viewvc/ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java?rev=656469&r1=656468&r2=656469&view=diff
==============================================================================
--- ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java (original)
+++ ode/trunk/axis2-war/src/test/java/org/apache/ode/axis2/TestSimpleScenario.java Wed May 14 17:02:04 2008
@@ -31,6 +31,7 @@
             String response = server.sendRequestFile("http://localhost:8080/ode/processes/MSMainExecuteService",
                     bundleName, "testRequest.soap");
 
+            System.out.println("->" + response);
             assertTrue(response.indexOf("OK") > 0);
         } finally {
             server.undeployProcess(bundleName);

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=656469&r1=656468&r2=656469&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Wed May 14 17:02:04 2008
@@ -293,7 +293,7 @@
                 // conditions for deadlock in the correlation tables. However if invocation style is transacted,
                 // we need to do the work right then and there.
 
-                if (istyle == InvocationStyle.TRANSACTED || istyle == InvocationStyle.P2P) {
+                if (istyle == InvocationStyle.TRANSACTED) {
                     doInstanceWork(mexdao.getInstance().getInstanceId(), new Callable<Void>() {
                         public Void call() {
                             executeContinueInstanceMyRoleRequestReceived(mexdao);
@@ -1021,15 +1021,13 @@
      * @param runnable
      */
     void scheduleRunnable(final Runnable runnable) {
-        if (__log.isDebugEnabled())
-            __log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
+        if (__log.isDebugEnabled()) __log.debug("schedulingRunnable for process " + _pid + ": " + runnable);
 
         _server.scheduleRunnable(new ProcessRunnable(runnable));
     }
 
     void enqueueRunnable(BpelInstanceWorker worker) {
-        if (__log.isDebugEnabled())
-            __log.debug("enqueuRunnable for process " + _pid + ": " + worker);
+        if (__log.isDebugEnabled()) __log.debug("enqueuRunnable for process " + _pid + ": " + worker);
 
         _server.enqueueRunnable(new ProcessRunnable(worker));
     }
@@ -1040,7 +1038,6 @@
         final String mexId = new GUID().toString();
         _hydrationLatch.latch(1);
         try {
-
             final PartnerLinkMyRoleImpl target = getPartnerLinkForService(targetService);
             if (target == null)
                 throw new BpelEngineException("NoSuchService: " + targetService);
@@ -1056,31 +1053,21 @@
     }
 
     void onMyRoleMexAck(MessageExchangeDAO mexdao, Status old) {
-
         if (mexdao.getPipedMessageExchangeId() != null) /* p2p */{
-
             BpelProcess caller = _server.getBpelProcess(mexdao.getPipedPID());
-            if (caller == null) {
-                // process no longer deployed....
-
-                return;
-            }
+            // process no longer deployed....
+            if (caller == null) return;
 
             MessageExchangeDAO pmex = caller.loadMexDao(mexdao.getPipedMessageExchangeId());
-            if (pmex == null) {
-                // Mex no longer there.... odd..
-
-                return;
-            }
+            // Mex no longer there.... odd..
+            if (pmex == null) return;
 
             // Need to copy the response and state from myrolemex --> partnerrolemex
-
             boolean compat = !(caller.isInMemory() ^ isInMemory());
             if (compat) {
                 // both processes are in-mem or both are persisted, can share the message
                 pmex.setResponse(mexdao.getResponse());
             } else /* one process in-mem, other persisted */{
-
                 MessageDAO presponse = pmex.createMessage(mexdao.getResponse().getType());
                 presponse.setData(mexdao.getResponse().getData());
                 pmex.setResponse(presponse);
@@ -1089,9 +1076,7 @@
             pmex.setAckType(mexdao.getAckType());
             pmex.setFailureType(mexdao.getFailureType());
 
-            if (old == Status.ASYNC)
-                caller.p2pWakeup(pmex);
-
+            if (old == Status.ASYNC) caller.p2pWakeup(pmex);
         } else /* not p2p */{
             // Do an Async wakeup if we are in the ASYNC state. If we're not, we'll pick up the ACK when we unwind
             // the stack.
@@ -1105,9 +1090,7 @@
                     __log.error("Integration layer threw an unexepcted exception.", t);
                 }
             }
-
         }
-
     }
 
     class ProcessRunnable implements Runnable {
@@ -1116,7 +1099,6 @@
         ProcessRunnable(Runnable work) {
             _work = work;
         }
-
         public void run() {
             _hydrationLatch.latch(1);
             try {
@@ -1124,9 +1106,7 @@
             } finally {
                 _hydrationLatch.release(1);
             }
-
         }
-
     }
 
     class ProcessCallable<T> implements Callable<T> {
@@ -1321,14 +1301,13 @@
      * @param partnerRoleMex
      */
     private void invokeP2P(BpelProcess target, QName serviceName, Operation operation, MessageExchangeDAO partnerRoleMex) {
-        if (BpelProcess.__log.isDebugEnabled()) {
-            __log
-                    .debug("Invoking in a p2p interaction, partnerrole " + partnerRoleMex.getMessageExchangeId() + " target="
-                            + target);
-        }
+        if (BpelProcess.__log.isDebugEnabled())
+            __log.debug("Invoking in a p2p interaction, partnerrole " + partnerRoleMex.getMessageExchangeId()
+                    + " target=" + target);
 
         partnerRoleMex.setInvocationStyle(InvocationStyle.P2P);
 
+        // Plumbing
         MessageExchangeDAO myRoleMex = target.createMessageExchange(new GUID().toString(),
                 MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
         myRoleMex.setStatus(Status.REQ);
@@ -1350,10 +1329,9 @@
         String mySessionId = partnerRoleMex.getPartnerLink().getMySessionId();
         String partnerSessionId = partnerRoleMex.getPartnerLink().getPartnerSessionId();
 
-        if (BpelProcess.__log.isDebugEnabled()) {
-            __log.debug("Setting myRoleMex session ids for p2p interaction, mySession " + partnerSessionId + " - partnerSess "
-                    + mySessionId);
-        }
+        if (BpelProcess.__log.isDebugEnabled())
+            __log.debug("Setting myRoleMex session ids for p2p interaction, mySession " + partnerSessionId
+                    + " - partnerSess " + mySessionId);
 
         if (mySessionId != null)
             partnerRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId);
@@ -1368,8 +1346,10 @@
         if (__log.isDebugEnabled())
             __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
 
+        // A classic P2P interaction is considered reliable. The invocation should take place
+        // in the local transaction but the invoked process is not supposed to hold our thread
+        // and the reply should come in a separate transaction.
         target.invokeProcess(myRoleMex);
-
     }
 
     /**

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=656469&r1=656468&r2=656469&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Wed May 14 17:02:04 2008
@@ -710,7 +710,7 @@
         // we need to inject a message on the response channel, so that the process continues.
         switch (mexDao.getStatus()) {
         case ACK:
-            injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
+            if (mexDao.getChannel() != null) injectPartnerResponse(mexDao.getMessageExchangeId(), mexDao.getChannel());
             break;
         case ASYNC:
             // we'll have to wait for the response.