You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mr...@apache.org on 2007/09/15 01:49:16 UTC

svn commit: r575828 - in /ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine: BpelProcess.java BpelServerImpl.java PartnerLinkMyRoleImpl.java

Author: mriou
Date: Fri Sep 14 16:49:12 2007
New Revision: 575828

URL: http://svn.apache.org/viewvc?rev=575828&view=rev
Log:
ODE-182 Allow 2 different partner links on the same service endpoint.

Modified:
    ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
    ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java

Modified: ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=575828&r1=575827&r2=575828&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Fri Sep 14 16:49:12 2007
@@ -39,12 +39,7 @@
 import org.apache.ode.bpel.evt.ScopeEvent;
 import org.apache.ode.bpel.explang.ConfigurationException;
 import org.apache.ode.bpel.explang.EvaluationException;
-import org.apache.ode.bpel.iapi.BpelEngineException;
-import org.apache.ode.bpel.iapi.Endpoint;
-import org.apache.ode.bpel.iapi.EndpointReference;
-import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleChannel;
-import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.*;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.o.OElementVarType;
@@ -59,7 +54,6 @@
 import org.apache.ode.bpel.runtime.channels.FaultData;
 import org.apache.ode.jacob.soup.ReplacementMap;
 import org.apache.ode.utils.ObjectPrinter;
-import org.apache.ode.utils.DOMUtils;
 import org.apache.ode.utils.msg.MessageBundle;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
@@ -81,34 +75,26 @@
 
     private volatile Map<OPartnerLink, PartnerLinkMyRoleImpl> _myRoles;
 
-    /** Mapping from {"Service Name" (QNAME) / port} to a myrole. */
-    private volatile Map<Endpoint, PartnerLinkMyRoleImpl> _endpointToMyRoleMap;
+    /** Mapping from a myrole to a {"Service Name" (QNAME) / port}. It's actually more a tuple than a map as
+     * it's important to note that the same process with the same endpoint can have 2 different myroles. */
+    private volatile Map<PartnerLinkMyRoleImpl, Endpoint> _endpointToMyRoleMap;
 
     // Backup hashmaps to keep initial endpoints handy after dehydration
     private Map<Endpoint, EndpointReference> _myEprs = new HashMap<Endpoint, EndpointReference>();
-
     private Map<Endpoint, EndpointReference> _partnerEprs = new HashMap<Endpoint, EndpointReference>();
-
     private Map<Endpoint, PartnerRoleChannel> _partnerChannels = new HashMap<Endpoint, PartnerRoleChannel>();
 
     final QName _pid;
-
     private volatile OProcess _oprocess;
-
     // Has the process already been hydrated before?
     private boolean _hydratedOnce = false;
-
     /** Last time the process was used. */
     private volatile long _lastUsed;
 
     BpelEngineImpl _engine;
-
     DebuggerSupport _debugger;
-
     ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry;
-
     private ReplacementMap _replacementMap;
-
     final ProcessConf _pconf;
 
     /** {@link MessageExchangeInterceptor}s registered for this process. */
@@ -150,8 +136,8 @@
     void invokeProcess(MyRoleMessageExchangeImpl mex) {
         try {
             _hydrationLatch.latch(1);
-            PartnerLinkMyRoleImpl target = getMyRoleForService(mex.getServiceName());
-            if (target == null) {
+            List<PartnerLinkMyRoleImpl> targets = getMyRolesForService(mex.getServiceName());
+            if (targets.isEmpty()) {
                 String errmsg = __msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId());
                 __log.error(errmsg);
                 mex.setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, errmsg, null);
@@ -166,7 +152,43 @@
             }
 
             markused();
-            target.invokeMyRole(mex);
+
+            // Ideally, if Java supported closure, the routing code would return null or the appropriate
+            // closure to handle the route.
+            PartnerLinkMyRoleImpl.RoutingInfo routing = null;
+            boolean routed = false;
+            for (PartnerLinkMyRoleImpl target : targets) {
+                routing = target.findRoute(mex);
+                boolean createInstance = target.isCreateInstance(mex);
+
+                if (mex.getStatus() != MessageExchange.Status.FAILURE) {
+                    if (routing.messageRoute == null && createInstance) {
+                        // No route but we can create a new instance
+                        target.invokeNewInstance(mex, routing);
+                        routed = true; break;
+                    } else if (routing.messageRoute != null) {
+                        // Found a route, hitting it
+                        target.invokeInstance(mex, routing);
+                        routed = true; break;
+                    }
+                }
+            }
+
+            // Nothing found, saving for later
+            if (!routed) {
+                // TODO this is kind of hackish when no match and more than one myrole is selected.
+                // we save the routing on the last myrole
+                // actually the message queue should be attached to the instance instead of the correlator
+                targets.get(targets.size()-1).noRoutingMatch(mex, routing);
+            }
+
+            // Now we have to update our message exchange status. If the <reply> was not hit during the
+            // invocation, then we will be in the "REQUEST" phase which means that either this was a one-way
+            // or a two-way that needs to delivery the reply asynchronously.
+            if (mex.getStatus() == MessageExchange.Status.REQUEST) {
+                mex.setStatus(MessageExchange.Status.ASYNC);
+            }
+
             markused();
         } finally {
             _hydrationLatch.release(1);
@@ -178,20 +200,27 @@
         }
     }
 
-    private PartnerLinkMyRoleImpl getMyRoleForService(QName serviceName) {
-        for (Map.Entry<Endpoint, PartnerLinkMyRoleImpl> e : getEndpointToMyRoleMap().entrySet()) {
-            if (e.getKey().serviceName.equals(serviceName))
-                return e.getValue();
+    /** Several myroles can use the same service in a given process */
+    private List<PartnerLinkMyRoleImpl> getMyRolesForService(QName serviceName) {
+        List<PartnerLinkMyRoleImpl> myRoles = new ArrayList<PartnerLinkMyRoleImpl>(5);
+        for (Map.Entry<PartnerLinkMyRoleImpl,Endpoint> e : getEndpointToMyRoleMap().entrySet()) {
+            if (e.getValue().serviceName.equals(serviceName))
+                myRoles.add(e.getKey());
         }
-        return null;
+        return myRoles;
     }
 
     void initMyRoleMex(MyRoleMessageExchangeImpl mex) {
         markused();
+
         PartnerLinkMyRoleImpl target = null;
-        for (Endpoint endpoint : getEndpointToMyRoleMap().keySet()) {
-            if (endpoint.serviceName.equals(mex.getServiceName()))
-                target = getEndpointToMyRoleMap().get(endpoint);
+        for (Map.Entry<PartnerLinkMyRoleImpl,Endpoint> e : getEndpointToMyRoleMap().entrySet()) {
+            if (e.getValue().serviceName.equals(mex.getServiceName())) {
+                // First one is fine as we're only interested in the portType and operation here and
+                // even if a process has 2 myrole partner links
+                target = e.getKey();
+                break;
+            }
         }
         if (target != null) {
             mex.setPortOp(target._plinkDef.myRolePortType, target._plinkDef.getMyRoleOperation(mex.getOperationName()));
@@ -350,7 +379,7 @@
     private void setRoles(OProcess oprocess) {
         _partnerRoles = new HashMap<OPartnerLink, PartnerLinkPartnerRoleImpl>();
         _myRoles = new HashMap<OPartnerLink, PartnerLinkMyRoleImpl>();
-        _endpointToMyRoleMap = new HashMap<Endpoint, PartnerLinkMyRoleImpl>();
+        _endpointToMyRoleMap = new HashMap<PartnerLinkMyRoleImpl, Endpoint>();
 
         // Create myRole endpoint name mapping (from deployment descriptor)
         HashMap<OPartnerLink, Endpoint> myRoleEndpoints = new HashMap<OPartnerLink, Endpoint>();
@@ -385,7 +414,7 @@
                     throw new IllegalArgumentException("No service name for myRole plink " + pl.getName());
                 PartnerLinkMyRoleImpl myRole = new PartnerLinkMyRoleImpl(this, pl, endpoint);
                 _myRoles.put(pl, myRole);
-                _endpointToMyRoleMap.put(endpoint, myRole);
+                _endpointToMyRoleMap.put(myRole, endpoint);
             }
 
             if (pl.hasPartnerRole()) {
@@ -561,7 +590,7 @@
         }
     }
 
-    private Map<Endpoint, PartnerLinkMyRoleImpl> getEndpointToMyRoleMap() {
+    private Map<PartnerLinkMyRoleImpl,Endpoint> getEndpointToMyRoleMap() {
         try {
             _hydrationLatch.latch(1);
             return _endpointToMyRoleMap;

Modified: ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=575828&r1=575827&r2=575828&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Fri Sep 14 16:49:12 2007
@@ -274,7 +274,7 @@
 
             _engine.registerProcess(process);
             _registeredProcesses.add(process);
-            process.hydrate();
+            if (_dehydrationPolicy == null) process.hydrate();
 
             __log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
         } finally {

Modified: ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java?rev=575828&r1=575827&r2=575828&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java (original)
+++ ode/branches/APACHE_ODE_1.1/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java Fri Sep 14 16:49:12 2007
@@ -77,13 +77,12 @@
         return buf.toString();
     }
 
-    /**
-     * Called when an input message has been received.
-     *
-     * @param mex
-     *            exchange to which the message is related
-     */
-    public void invokeMyRole(MyRoleMessageExchangeImpl mex) {
+    public boolean isCreateInstance(MyRoleMessageExchangeImpl mex) {
+        Operation operation = getMyRoleOperation(mex.getOperationName());
+        return _plinkDef.isCreateInstanceOperation(operation);
+    }
+
+    public RoutingInfo findRoute(MyRoleMessageExchangeImpl mex) {
         if (__log.isTraceEnabled()) {
             __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] {
                     "messageExchange", mex }));
@@ -93,23 +92,13 @@
         if (operation == null) {
             __log.error(__msgs.msgUnknownOperation(mex.getOperationName(), _plinkDef.myRolePortType.getQName()));
             mex.setFailure(MessageExchange.FailureType.UNKNOWN_OPERATION, mex.getOperationName(), null);
-            return;
+            return null;
         }
+        setMexRole(mex);
 
-        mex.getDAO().setPartnerLinkModelId(_plinkDef.getId());
-        mex.setPortOp(_plinkDef.myRolePortType, operation);
-        mex.setPattern(operation.getOutput() == null ? MessageExchange.MessageExchangePattern.REQUEST_ONLY
-                : MessageExchange.MessageExchangePattern.REQUEST_RESPONSE);
-
-        // Is this a /possible/ createInstance Operation?
-        boolean isCreateInstnace = _plinkDef.isCreateInstanceOperation(operation);
-
-        // now, the tricks begin: when a message arrives we have to see if
-        // there
-        // is anyone waiting for it.
-        // Get the correlator, a persisted communnication-reduction data
-        // structure
-        // supporting correlation correlationKey matching!
+        // now, the tricks begin: when a message arrives we have to see if there
+        // is anyone waiting for it. Get the correlator, a persisted communication-reduction
+        // data structure supporting correlation correlationKey matching!
         String correlatorId = BpelProcess.genCorrelatorId(_plinkDef, operation.getName());
 
         CorrelatorDAO correlator = _process.getProcessDAO().getCorrelator(correlatorId);
@@ -129,7 +118,7 @@
             // a message format problem.
             __log.debug("Unable to evaluate correlation keys, invalid message format. ",ime);
             mex.setFailure(MessageExchange.FailureType.FORMAT_ERROR, ime.getMessage(), null);
-            return;
+            return null;
         }
 
         String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
@@ -154,116 +143,129 @@
             }
         }
 
-        // TODO - ODE-58
+        return new RoutingInfo(messageRoute, matchedKey, correlator, keys);
+    }
 
-        // If no luck, and this operation qualifies for create-instance
-        // treatment, then create a new process
-        // instance.
-        if (messageRoute == null && isCreateInstnace) {
-            if (__log.isDebugEnabled()) {
-                __log.debug("INPUTMSG: " + correlatorId + ": routing failed, CREATING NEW INSTANCE");
-            }
-            ProcessDAO processDAO = _process.getProcessDAO();
+    class RoutingInfo {
+        MessageRouteDAO messageRoute;
+        CorrelationKey matchedKey;
+        CorrelatorDAO correlator;
+        CorrelationKey[] keys;
 
-            if (_process._pconf.getState() == ProcessState.RETIRED) {
-                throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
-            }
+        public RoutingInfo(MessageRouteDAO messageRoute, CorrelationKey matchedKey,
+                           CorrelatorDAO correlator, CorrelationKey[] keys) {
+            this.messageRoute = messageRoute;
+            this.matchedKey = matchedKey;
+            this.correlator = correlator;
+            this.keys = keys;
+        }
+    }
 
-            if (!_process.processInterceptors(mex, InterceptorInvoker.__onNewInstanceInvoked)) {
-                __log.debug("Not creating a new instance for mex " + mex + "; interceptor prevented!");
-                return;
-            }
+    public void invokeNewInstance(MyRoleMessageExchangeImpl mex, RoutingInfo routing) {
+        Operation operation = getMyRoleOperation(mex.getOperationName());
 
-            ProcessInstanceDAO newInstance = processDAO.createInstance(correlator);
+        if (__log.isDebugEnabled()) {
+            __log.debug("INPUTMSG: " + routing.correlator.getCorrelatorId() + ": routing failed, CREATING NEW INSTANCE");
+        }
+        ProcessDAO processDAO = _process.getProcessDAO();
 
-            BpelRuntimeContextImpl instance = _process
-                    .createRuntimeContext(newInstance, new PROCESS(_process.getOProcess()), mex);
+        if (_process._pconf.getState() == ProcessState.RETIRED) {
+            throw new InvalidProcessException("Process is retired.", InvalidProcessException.RETIRED_CAUSE_CODE);
+        }
 
-            // send process instance event
-            NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_process.getOProcess().targetNamespace,
-                    _process.getOProcess().getName()), _process.getProcessDAO().getProcessId(), newInstance.getInstanceId());
-            evt.setPortType(mex.getPortType().getQName());
-            evt.setOperation(operation.getName());
-            evt.setMexId(mex.getMessageExchangeId());
-            _process._debugger.onEvent(evt);
-            _process.saveEvent(evt, newInstance);
-            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE);
-            mex.getDAO().setInstance(newInstance);
-
-            instance.execute();
-        } else if (messageRoute != null) {
-            if (__log.isDebugEnabled()) {
-                __log.debug("INPUTMSG: " + correlatorId + ": ROUTING to instance "
-                        + messageRoute.getTargetInstance().getInstanceId());
-            }
+        if (!_process.processInterceptors(mex, InterceptorInvoker.__onNewInstanceInvoked)) {
+            __log.debug("Not creating a new instance for mex " + mex + "; interceptor prevented!");
+            return;
+        }
 
-            ProcessInstanceDAO instanceDao = messageRoute.getTargetInstance();
+        ProcessInstanceDAO newInstance = processDAO.createInstance(routing.correlator);
 
-            // Reload process instance for DAO.
-            BpelRuntimeContextImpl instance = _process.createRuntimeContext(instanceDao, null, null);
-            instance.inputMsgMatch(messageRoute.getGroupId(), messageRoute.getIndex(), mex);
-
-            // Kill the route so some new message does not get routed to
-            // same process instance.
-            correlator.removeRoutes(messageRoute.getGroupId(), instanceDao);
-
-            // send process instance event
-            CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_process.getOProcess().targetNamespace,
-                    _process.getOProcess().getName()), _process.getProcessDAO().getProcessId(),
-                    instanceDao.getInstanceId(), matchedKey);
-            evt.setPortType(mex.getPortType().getQName());
-            evt.setOperation(operation.getName());
-            evt.setMexId(mex.getMessageExchangeId());
+        BpelRuntimeContextImpl instance = _process
+                .createRuntimeContext(newInstance, new PROCESS(_process.getOProcess()), mex);
 
-            _process._debugger.onEvent(evt);
-            // store event
-            _process.saveEvent(evt, instanceDao);
+        // send process instance event
+        NewProcessInstanceEvent evt = new NewProcessInstanceEvent(new QName(_process.getOProcess().targetNamespace,
+                _process.getOProcess().getName()), _process.getProcessDAO().getProcessId(), newInstance.getInstanceId());
+        evt.setPortType(mex.getPortType().getQName());
+        evt.setOperation(operation.getName());
+        evt.setMexId(mex.getMessageExchangeId());
+        _process._debugger.onEvent(evt);
+        _process.saveEvent(evt, newInstance);
+        mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.CREATE_INSTANCE);
+        mex.getDAO().setInstance(newInstance);
 
-            // EXPERIMENTAL -- LOCK
-            // instanceDao.lock();
+        instance.execute();
+    }
 
-            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED);
-            mex.getDAO().setInstance(messageRoute.getTargetInstance());
-            instance.execute();
-        } else {
-            if (__log.isDebugEnabled()) {
-                __log.debug("INPUTMSG: " + correlatorId + ": SAVING to DB (no match) ");
-            }
+    public void invokeInstance(MyRoleMessageExchangeImpl mex, RoutingInfo routing) {
+        Operation operation = getMyRoleOperation(mex.getOperationName());
+        if (__log.isDebugEnabled()) {
+            __log.debug("INPUTMSG: " + routing.correlator.getCorrelatorId() + ": ROUTING to instance "
+                    + routing.messageRoute.getTargetInstance().getInstanceId());
+        }
 
-            if (!mex.isAsynchronous()) {
-                mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
+        ProcessInstanceDAO instanceDao = routing.messageRoute.getTargetInstance();
 
-            } else {
-                // send event
-                CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
-                        .getOperation().getName(), mex.getMessageExchangeId(), keys);
-
-                evt.setProcessId(_process.getProcessDAO().getProcessId());
-                evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
-                _process._debugger.onEvent(evt);
+        // Reload process instance for DAO.
+        BpelRuntimeContextImpl instance = _process.createRuntimeContext(instanceDao, null, null);
+        instance.inputMsgMatch(routing.messageRoute.getGroupId(), routing.messageRoute.getIndex(), mex);
+
+        // Kill the route so some new message does not get routed to
+        // same process instance.
+        routing.correlator.removeRoutes(routing.messageRoute.getGroupId(), instanceDao);
+
+        // send process instance event
+        CorrelationMatchEvent evt = new CorrelationMatchEvent(new QName(_process.getOProcess().targetNamespace,
+                _process.getOProcess().getName()), _process.getProcessDAO().getProcessId(),
+                instanceDao.getInstanceId(), routing.matchedKey);
+        evt.setPortType(mex.getPortType().getQName());
+        evt.setOperation(operation.getName());
+        evt.setMexId(mex.getMessageExchangeId());
+
+        _process._debugger.onEvent(evt);
+        // store event
+        _process.saveEvent(evt, instanceDao);
+
+        mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED);
+        mex.getDAO().setInstance(routing.messageRoute.getTargetInstance());
+        instance.execute();
+    }
 
-                mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED);
+    public void noRoutingMatch(MyRoleMessageExchangeImpl mex, RoutingInfo routing) {
+        if (__log.isDebugEnabled()) {
+            __log.debug("INPUTMSG: " + routing.correlator.getCorrelatorId() + ": SAVING to DB (no match) ");
+        }
 
-                // No match, means we add message exchange to the queue.
-                correlator.enqueueMessage(mex.getDAO(), keys);
+        if (!mex.isAsynchronous()) {
+            mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
 
-            }
-        }
+        } else {
+            // send event
+            CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
+                    .getOperation().getName(), mex.getMessageExchangeId(), routing.keys);
+
+            evt.setProcessId(_process.getProcessDAO().getProcessId());
+            evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
+            _process._debugger.onEvent(evt);
+
+            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED);
 
-        // Now we have to update our message exchange status. If the <reply>
-        // was not hit during the
-        // invocation, then we will be in the "REQUEST" phase which means
-        // that either this was a one-way
-        // or a two-way that needs to delivery the reply asynchronously.
-        if (mex.getStatus() == MessageExchange.Status.REQUEST) {
-            mex.setStatus(MessageExchange.Status.ASYNC);
+            // No match, means we add message exchange to the queue.
+            routing.correlator.enqueueMessage(mex.getDAO(), routing.keys);
         }
     }
 
+    private void setMexRole(MyRoleMessageExchangeImpl mex) {
+        Operation operation = getMyRoleOperation(mex.getOperationName());
+        mex.getDAO().setPartnerLinkModelId(_plinkDef.getId());
+        mex.setPortOp(_plinkDef.myRolePortType, operation);
+        mex.setPattern(operation.getOutput() == null ? MessageExchange.MessageExchangePattern.REQUEST_ONLY
+                : MessageExchange.MessageExchangePattern.REQUEST_RESPONSE);
+    }
+
     @SuppressWarnings("unchecked")
     private Operation getMyRoleOperation(String operationName) {
-        Operation op = _plinkDef.getMyRoleOperation(operationName);
-        return op;
+        return _plinkDef.getMyRoleOperation(operationName);
     }
 
     private CorrelationKey[] computeCorrelationKeys(MyRoleMessageExchangeImpl mex) {