You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by rr...@apache.org on 2010/07/13 15:27:54 UTC

svn commit: r963709 - in /ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel: engine/BpelProcess.java engine/BpelRuntimeContextImpl.java engine/MessageExchangeImpl.java engine/PartnerLinkMyRoleImpl.java runtime/BpelRuntimeContext.java

Author: rr
Date: Tue Jul 13 13:27:53 2010
New Revision: 963709

URL: http://svn.apache.org/viewvc?rev=963709&view=rev
Log:
ODE-556: Rejecting in-out operations immediately when there's no route found - fixed P2P case

Modified:
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
    ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/BpelRuntimeContext.java

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=963709&r1=963708&r2=963709&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Tue Jul 13 13:27:53 2010
@@ -31,6 +31,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import javax.wsdl.Fault;
 import javax.xml.namespace.QName;
 
 import org.apache.commons.logging.Log;
@@ -40,8 +41,11 @@ import org.apache.ode.bpel.common.FaultE
 import org.apache.ode.bpel.common.ProcessState;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
 import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.engine.BpelProcess;
+import org.apache.ode.bpel.engine.MyRoleMessageExchangeImpl;
 import org.apache.ode.bpel.engine.extvar.ExternalVariableConf;
 import org.apache.ode.bpel.engine.extvar.ExternalVariableManager;
 import org.apache.ode.bpel.evt.ProcessInstanceEvent;
@@ -50,10 +54,13 @@ import org.apache.ode.bpel.explang.Evalu
 import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReference;
+import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
+import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.iapi.ProcessConf;
 import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
 import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
 import org.apache.ode.bpel.iapi.Scheduler.JobType;
@@ -66,6 +73,7 @@ import org.apache.ode.bpel.o.OMessageVar
 import org.apache.ode.bpel.o.OPartnerLink;
 import org.apache.ode.bpel.o.OProcess;
 import org.apache.ode.bpel.o.Serializer;
+import org.apache.ode.bpel.runtime.BpelRuntimeContext;
 import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
 import org.apache.ode.bpel.runtime.InvalidProcessException;
 import org.apache.ode.bpel.runtime.PROCESS;
@@ -1157,4 +1165,59 @@ public class BpelProcess {
     public int getVersion() {
         return Integer.parseInt(_pid.getLocalPart().substring(_pid.getLocalPart().lastIndexOf('-') + 1));
     }
+    
+    public void doAsyncReply(MyRoleMessageExchangeImpl m, BpelRuntimeContext context) {
+        MessageExchangeDAO mex = m.getDAO();
+        PartnerRoleMessageExchange pmex = null;
+
+        if (mex.getPipedMessageExchangeId() != null) {
+            pmex = (PartnerRoleMessageExchange) getEngine().getMessageExchange(mex.getPipedMessageExchangeId());
+        }
+
+        if (pmex != null) {
+            if (BpelProcess.__log.isDebugEnabled()) {
+                __log.debug("Replying to a p2p mex, myrole " + m + " - partnerole " + pmex);
+            }
+
+            if (pmex.getStatus() == Status.ASYNC || pmex.getStatus() == Status.REQUEST) {
+                try {
+                    switch (m.getStatus()) {
+                        case FAILURE:
+                            // We can't seem to get the failure out of the myrole mex?
+                            pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", null);
+                            break;
+                        case FAULT:
+                            Fault fault = pmex.getOperation().getFault(m.getFault().getLocalPart());
+                            if (fault == null) {
+                                __log.error("process " + this + " instance " + (context != null ? context.getPid() : null) + " thrown unmapped fault in p2p communication " + m.getFault() + " " + m.getFaultExplanation() + " - converted to failure");
+                                pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "process thrown unmapped fault in p2p communication " + m.getFault() + " " + m.getFaultExplanation() + " - converted to failure", m.getFaultResponse().getMessage());
+                            } else {
+                                Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
+                                        .getMessage().getQName());
+                                faultRes.setMessage(m.getResponse().getMessage());
+                                pmex.replyWithFault(m.getFault(), faultRes);
+                            }
+                            break;
+                        case RESPONSE:
+                            Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
+                            response.setMessage(m.getResponse().getMessage());
+                            pmex.reply(response);
+                            break;
+                        default:
+                            __log.warn("Unexpected state: " + m.getStatus());
+                            break;
+                    }
+                } finally {
+                    mex.release(this.isCleanupCategoryEnabled(m.getStatus() == MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
+                }
+            } else {
+                __log.warn("Can't send response to a p2p mex: " + mex + " partner mex: " + pmex);
+            }
+        } else {
+            if (context != null) context.checkInvokeExternalPermission();
+            this._engine._contexts.mexContext.onAsyncReply(m);
+            //mex.release(_bpelProcess.isCleanupCategoryEnabled(m.getStatus() == MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
+        }
+    }
+    
 }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?rev=963709&r1=963708&r2=963709&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Tue Jul 13 13:27:53 2010
@@ -529,61 +529,6 @@ public class BpelRuntimeContextImpl impl
         }
     }
 
-    protected void doAsyncReply(MyRoleMessageExchangeImpl m) {
-        MessageExchangeDAO mex = m.getDAO();
-        PartnerRoleMessageExchange pmex = null;
-
-        if (mex.getPipedMessageExchangeId() != null) {
-            pmex = (PartnerRoleMessageExchange) _bpelProcess
-                    .getEngine().getMessageExchange(mex.getPipedMessageExchangeId());
-        }
-
-        if (pmex != null) {
-            if (BpelProcess.__log.isDebugEnabled()) {
-                __log.debug("Replying to a p2p mex, myrole " + m + " - partnerole " + pmex);
-            }
-
-            if (pmex.getStatus() == Status.ASYNC || pmex.getStatus() == Status.REQUEST) {
-                try {
-                    switch (m.getStatus()) {
-                        case FAILURE:
-                            // We can't seem to get the failure out of the myrole mex?
-                            pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", null);
-                            break;
-                        case FAULT:
-                            Fault fault = pmex.getOperation().getFault(m.getFault().getLocalPart());
-                            if (fault == null) {
-                                __log.error("process " + _bpelProcess + " instance " + _iid + " thrown unmapped fault in p2p communication " + m.getFault() + " " + m.getFaultExplanation() + " - converted to failure");
-                                pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "process thrown unmapped fault in p2p communication " + m.getFault() + " " + m.getFaultExplanation() + " - converted to failure", m.getFaultResponse().getMessage());
-                            } else {
-                                Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
-                                        .getMessage().getQName());
-                                faultRes.setMessage(m.getResponse().getMessage());
-                                pmex.replyWithFault(m.getFault(), faultRes);
-                            }
-                            break;
-                        case RESPONSE:
-                            Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
-                            response.setMessage(m.getResponse().getMessage());
-                            pmex.reply(response);
-                            break;
-                        default:
-                            __log.warn("Unexpected state: " + m.getStatus());
-                            break;
-                    }
-                } finally {
-                    mex.release(_bpelProcess.isCleanupCategoryEnabled(m.getStatus() == MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
-                }
-            } else {
-                __log.warn("Can't send response to a p2p mex: " + mex + " partner mex: " + pmex);
-            }
-        } else {
-            checkInvokeExternalPermission();
-            _bpelProcess._engine._contexts.mexContext.onAsyncReply(m);
-            //mex.release(_bpelProcess.isCleanupCategoryEnabled(m.getStatus() == MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
-        }
-    }
-
     public void reply(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg,
             QName fault) throws FaultException {
         String mexRef = _imaManager.release(plinkInstnace, opName, mexId);
@@ -620,7 +565,7 @@ public class BpelRuntimeContextImpl impl
             evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
         }
 
-        doAsyncReply(m);
+        _bpelProcess.doAsyncReply(m, this);
 
         // send event
         sendEvent(evt);
@@ -1211,7 +1156,7 @@ public class BpelRuntimeContextImpl impl
                         }
                     default:
                         mex.setFailure(FailureType.OTHER, "No response.", null);
-                        doAsyncReply(mex);
+                        _bpelProcess.doAsyncReply(mex, this);
                 }
             }
         }
@@ -1232,7 +1177,7 @@ public class BpelRuntimeContextImpl impl
 
                 mex.setFault(faultData.getFaultName(), message);
                 mex.setFaultExplanation(faultData.getExplanation());
-                doAsyncReply(mex);
+                _bpelProcess.doAsyncReply(mex, this);
             }
         }
     }
@@ -1245,7 +1190,7 @@ public class BpelRuntimeContextImpl impl
                 MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess, _bpelProcess._engine, mexDao);
                 _bpelProcess.initMyRoleMex(mex);
                 mex.setFailure(FailureType.OTHER, "No response.", null);
-                doAsyncReply(mex);
+                _bpelProcess.doAsyncReply(mex, this);
             }
         }
     }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java?rev=963709&r1=963708&r2=963709&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MessageExchangeImpl.java Tue Jul 13 13:27:53 2010
@@ -182,6 +182,7 @@ abstract class MessageExchangeImpl imple
         // TODO not using FailureType, nor details
         setStatus(Status.FAILURE);
         getDAO().setFaultExplanation(reason);
+        responseReceived();
     }
 
     void setStatus(Status status) {

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?rev=963709&r1=963708&r2=963709&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Tue Jul 13 13:27:53 2010
@@ -258,7 +258,7 @@ public class PartnerLinkMyRoleImpl exten
         if (!mex.isAsynchronous()) {
             mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
             if (!OdeGlobalConfig.queueInOutMessages()) {
-                _process._engine._contexts.mexContext.onAsyncReply(mex);
+                _process.doAsyncReply(mex, null);
             }
         } else {
             // enqueue message with the last message route, as per the comments in caller (@see BpelProcess.invokeProcess())

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/BpelRuntimeContext.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/BpelRuntimeContext.java?rev=963709&r1=963708&r2=963709&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/BpelRuntimeContext.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/BpelRuntimeContext.java Tue Jul 13 13:27:53 2010
@@ -309,4 +309,6 @@ public interface BpelRuntimeContext {
     Date getCurrentEventDateTime();
 
     ClassLoader getProcessClassLoader();
+    
+    void checkInvokeExternalPermission();
 }