You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2016/01/20 12:28:57 UTC

ode git commit: ODE-974: Routing logic for messages arriving early has been fixed

Repository: ode
Updated Branches:
  refs/heads/ode-1.3.x ba61210b4 -> 501f70f20


ODE-974: Routing logic for messages arriving early has been fixed


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/501f70f2
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/501f70f2
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/501f70f2

Branch: refs/heads/ode-1.3.x
Commit: 501f70f200d02d82fc0607ccd5d7d8b76b000b6e
Parents: ba61210
Author: sathwik <sa...@apache.org>
Authored: Wed Jan 20 16:56:36 2016 +0530
Committer: sathwik <sa...@apache.org>
Committed: Wed Jan 20 16:56:36 2016 +0530

----------------------------------------------------------------------
 .../org/apache/ode/bpel/dao/ProcessDAO.java     |  10 ++
 .../apache/ode/bpel/engine/BpelEngineImpl.java  |  14 +-
 .../org/apache/ode/bpel/engine/BpelProcess.java | 141 ++++++++++++++-----
 .../ode/bpel/engine/BpelRuntimeContextImpl.java |   5 +
 .../ode/bpel/engine/PartnerLinkMyRoleImpl.java  |  74 ++++++----
 .../apache/ode/bpel/memdao/ProcessDaoImpl.java  |   6 +
 .../ode/daohib/bpel/CorrelatorDaoImpl.java      |  13 +-
 .../apache/ode/daohib/bpel/ProcessDaoImpl.java  |  39 ++++-
 .../ode/daohib/bpel/hobj/HCorrelationSet.java   |   5 +
 .../apache/ode/dao/jpa/CorrelatorDAOImpl.java   |  10 +-
 .../ode/dao/jpa/MessageExchangeDAOImpl.java     |   2 +-
 .../org/apache/ode/dao/jpa/ProcessDAOImpl.java  |  14 +-
 12 files changed, 251 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
----------------------------------------------------------------------
diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
index 97c252b..c68ac53 100644
--- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
+++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
@@ -118,4 +118,14 @@ public interface ProcessDAO {
      * @return all instances that haven't completed, use with care as there could be a lot of them 
      */
     Collection<ProcessInstanceDAO> getActiveInstances();
+
+    /**
+     * Locates process instances for a specific process version that matches correlation key and instance state
+     * @param ckey
+     *          Correlation key
+     * @param processInstanceState
+     *           Instance state org.apache.ode.bpel.common.ProcessState
+     * @return collection of {@link ProcessInstanceDAO} that match correlation key, instance state
+     */
+    Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey,short processInstanceState);
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
index 6a1edce..03efdd6 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
@@ -483,15 +483,25 @@ public class BpelEngineImpl implements BpelEngine {
                 if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) {
                     List<BpelProcess> processes = getAllProcesses(we.getProcessId());
                     boolean routed = false;
+
+                    //try to find the target process and execute
                     jobInfo.jobDetail.detailsExt.put("enqueue", false);
 
                     for(BpelProcess proc : processes) {
-                        routed = routed || proc.handleJobDetails(jobInfo.jobDetail);
+                        routed = proc.handleJobDetails(jobInfo.jobDetail);
+
+                        if(routed) break;
                     }
 
+                    //no target process was identified, enqueue the mex for later processing
                     if(!routed && we.getType() == JobType.INVOKE_INTERNAL) {
                         jobInfo.jobDetail.detailsExt.put("enqueue", true);
-                        process.handleJobDetails(jobInfo.jobDetail);
+
+                        for(BpelProcess proc : processes) {
+                            routed = proc.handleJobDetails(jobInfo.jobDetail);
+
+                            if(routed) break;
+                        }
                     }
                 }
                 else {

http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
index b5cef27..1a7ca92 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
@@ -21,9 +21,11 @@ package org.apache.ode.bpel.engine;
 import java.io.InputStream;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,12 +38,15 @@ import javax.xml.namespace.QName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.ode.agents.memory.SizingAgent;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.CorrelationKeySet;
 import org.apache.ode.bpel.common.FaultException;
 import org.apache.ode.bpel.common.ProcessState;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
 import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl.RoutingInfo;
 import org.apache.ode.bpel.engine.extvar.ExternalVariableConf;
 import org.apache.ode.bpel.engine.extvar.ExternalVariableManager;
 import org.apache.ode.bpel.evt.ProcessInstanceEvent;
@@ -51,6 +56,7 @@ import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.ProcessConf;
 import org.apache.ode.bpel.iapi.Scheduler;
@@ -217,38 +223,17 @@ public class BpelProcess {
                 return false;
             }
 
-            mex.getDAO().setProcess(getProcessDAO());
-
-            if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
-                __log.debug("Aborting processing of mex " + mex + " due to interceptors.");
-                return false;
-            }
-
-            markused();
-
-            // Ideally, if Java supported closure, the routing code would return null or the appropriate
-            // closure to handle the route.
-            List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null;
-            for (PartnerLinkMyRoleImpl target : targets) {
-                routings = target.findRoute(mex);
-                boolean createInstance = target.isCreateInstance(mex);
-
-                if (mex.getStatus() != MessageExchange.Status.FAILURE && routings!=null) {
-                    for (PartnerLinkMyRoleImpl.RoutingInfo routing : routings) {
-                        routed = routed || invokeHandler.invoke(target, routing, createInstance);
-                    }
-                }
-                if (routed) {
-                    break;
-                }
+            //Actual identification of the routes and invocation of the target process will happen when enqueue is disabled.
+            //This is the main conditional block of code that does the actual work.
+            //Its only after running this block with enqueue disabled, it can be identified that mex was not routable.
+            //There is a separate logic following this 'if' condition to handle the mex when enqueue is enabled.
+            if(!enqueue) {
+                routed = findRouteAndInvoke(targets, mex, invokeHandler);
             }
 
             // Nothing found, saving for later
-            if (!routed  && enqueue) {
-                // TODO this is kind of hackish when no match and more than one myrole is selected.
-                // we save the routing on the last myrole
-                // actually the message queue should be attached to the instance instead of the correlator
-                targets.get(targets.size()-1).noRoutingMatch(mex, routings);
+            if (enqueue && !routed) {
+                routed = noRoutingMatch(targets, mex);
             } else {
                 // Now we have to update our message exchange status. If the <reply> was not hit during the
                 // invocation, then we will be in the "REQUEST" phase which means that either this was a one-way
@@ -263,12 +248,100 @@ public class BpelProcess {
             _hydrationLatch.release(1);
         }
 
-        // For a one way, once the engine is done, the mex can be safely released.
-        // Sean: not really, if route is not found, we cannot delete the mex yet
-//        if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY) && routed && getCleanupCategories(false).contains(CLEANUP_CATEGORY.MESSAGES)) {
-//            mex.release();
-//        }
+        return routed;
+    }
+
+    //this method should be invoked within the ambit of _hydrationLatch
+    private boolean findRouteAndInvoke(List<PartnerLinkMyRoleImpl> targets, MyRoleMessageExchangeImpl mex, InvokeHandler invokeHandler) {
+        boolean routed = false;
+
+        //if mex is already queued then disallow overriding of process on the mex
+        if(!MyRoleMessageExchange.CorrelationStatus.QUEUED.equals(mex.getCorrelationStatus()))
+            mex.getDAO().setProcess(getProcessDAO());
+
+        if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
+            __log.debug("Aborting processing of mex " + mex + " due to interceptors.");
+            return false;
+        }
+
+        markused();
+
+        // Ideally, if Java supported closure, the routing code would return null or the appropriate
+        // closure to handle the route.
+        List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null;
+        for (PartnerLinkMyRoleImpl target : targets) {
+            routings = target.findRoute(mex);
+            boolean createInstance = target.isCreateInstance(mex);
+
+            if (mex.getStatus() != MessageExchange.Status.FAILURE && routings!=null) {
+                for (PartnerLinkMyRoleImpl.RoutingInfo routing : routings) {
+                    routed = routed || invokeHandler.invoke(target, routing, createInstance);
+                }
+            }
+            if (routed) {
+                break;
+            }
+        }
+        return routed;
+    }
+
+    //this method should be invoked within the ambit of _hydrationLatch
+    private boolean noRoutingMatch(List<PartnerLinkMyRoleImpl> targets,MyRoleMessageExchangeImpl mex) {
+        boolean routed = false;
+        boolean enqueue = true;
+        List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null;
+
+        /* try to find the active instance that is waiting on the correlated value and then associate the corresponding
+         * processDAO,correlator on the mex and enqueue for later processing.
+         */
+
+        // Need to iterate over the partnerlinks to identify the partnerlink that has the operation defined in the mex.
+        for (Iterator<PartnerLinkMyRoleImpl> targetItr = targets.iterator();
+                (targetItr.hasNext() && !routed);) {
+
+            PartnerLinkMyRoleImpl target = targetItr.next();
+            routings = target.findRoute(mex,enqueue);
+
+            //routings will be null if the mex operation is no defined in this myRole, iterate over next myRole.
+            if (routings != null) {
+                RoutingInfo routing = routings.get(routings.size()-1);
+
+                if (routing != null) {
+
+                    for( Iterator<CorrelationKeySet> aSubSetItr = routing.wholeKeySet.findSubSets().iterator();
+                            (aSubSetItr.hasNext() && !routed);) {
 
+                        CorrelationKeySet aSubSet = aSubSetItr.next();
+
+                        for(Iterator<CorrelationKey> keyItr = aSubSet.iterator();
+                                (keyItr.hasNext() && !routed);) {
+
+                            CorrelationKey key = keyItr.next();
+                            __log.info("noRoutingMatch: Finding active instance correlated with {} and process pid {}",new Object[]{key,_pid});
+
+                            // Assumption is, process instance is uniquely identifiable by any single initiated correlation key across multiple versions
+                            // of a same process type.
+
+                            // We need to make sure the PID of process of the instance is same as that of the
+                            // partnerlink's associated process in the iteration. Otherwise we might end up
+                            // associating wrong correlator with the mex.
+                            Collection<ProcessInstanceDAO> instanceDaoList = getProcessDAO().findInstance(key,ProcessState.STATE_ACTIVE);
+
+                            if (!instanceDaoList.isEmpty()) {
+                                ProcessInstanceDAO instance = instanceDaoList.iterator().next();
+                                mex.getDAO().setProcess(instance.getProcess());
+                                target.noRoutingMatch(mex, routing);
+                                routed = true;
+
+                                __log.info("noRoutingMatch: Active instance found instanceID: {} correlated with {} and process pid {}",
+                                        new Object[]{instance.getInstanceId(),key,_pid});
+
+                            }
+                        }
+                    }
+                }
+            }
+        }
         return routed;
     }
 

http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
index 8ac70c3..aff7733 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
@@ -1517,6 +1517,11 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext {
 
             MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess, _bpelProcess._engine, mexdao);
 
+            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED);
+            mex.getDAO().setInstance(_dao);
+            if (mex.getDAO().getCreateTime() == null)
+                mex.getDAO().setCreateTime(getCurrentEventDateTime());
+
             inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mex);
             execute();
 

http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
index 203a199..42dfd7a 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
@@ -94,8 +94,12 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
     }
 
     public List<RoutingInfo> findRoute(MyRoleMessageExchangeImpl mex) {
-    	List<RoutingInfo> routingInfos = new ArrayList<RoutingInfo>();
-    	
+        return findRoute(mex, false);
+    }
+
+    public List<RoutingInfo> findRoute(MyRoleMessageExchangeImpl mex, boolean enqueue) {
+        List<RoutingInfo> routingInfos = new ArrayList<RoutingInfo>();
+
         if (__log.isTraceEnabled()) {
             __log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] {
                     "messageExchange", mex }));
@@ -132,7 +136,7 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
             mex.setFailure(MessageExchange.FailureType.FORMAT_ERROR, ime.getMessage(), null);
             return null;
         }
-        
+
         String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
         String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
         if (__log.isDebugEnabled()) {
@@ -141,19 +145,22 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
                     + " partnerSessionId=" + partnerSessionId);
         }
 
-        // Try to find a route for one of our keys.
-        List<MessageRouteDAO> messageRoutes = correlator.findRoute(keySet);
-        if (messageRoutes != null && messageRoutes.size() > 0) {
-            for (MessageRouteDAO messageRoute : messageRoutes) {
-                if (__log.isDebugEnabled()) {
-                    __log.debug("INPUTMSG: " + correlatorId + ": ckeySet " + messageRoute.getCorrelationKeySet() + " route is to " + messageRoute);
+        //Avoid searching for message route when enqueue is enabled. It is only when no message route is found, enqueue will be enabled.
+        if(!enqueue){
+            // Try to find a route for one of our keys.
+            List<MessageRouteDAO> messageRoutes = correlator.findRoute(keySet);
+            if (messageRoutes != null && messageRoutes.size() > 0) {
+                for (MessageRouteDAO messageRoute : messageRoutes) {
+                    if (__log.isDebugEnabled()) {
+                        __log.debug("INPUTMSG: " + correlatorId + ": ckeySet " + messageRoute.getCorrelationKeySet() + " route is to " + messageRoute);
+                    }
+                    routingInfos.add(new RoutingInfo(messageRoute, messageRoute.getCorrelationKeySet(), correlator, keySet));
                 }
-                routingInfos.add(new RoutingInfo(messageRoute, messageRoute.getCorrelationKeySet(), correlator, keySet));
             }
         }
-        
+
         if (routingInfos.size() == 0) {
-        	routingInfos.add(new RoutingInfo(null, null, correlator, keySet));
+            routingInfos.add(new RoutingInfo(null, null, correlator, keySet));
         }
 
         return routingInfos;
@@ -252,31 +259,38 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
         instance.execute();
     }
 
+    /**
+     * @deprecated - Now an attempt is made to identify the correct routing in the BpelProcess.invokeProcess() with enqueue enabled.
+     * @param mex
+     * @param routings
+     */
     public void noRoutingMatch(MyRoleMessageExchangeImpl mex, List<RoutingInfo> routings) {
+        // enqueue message with the last message route, as per the comments in caller (@see BpelProcess.invokeProcess())
+        RoutingInfo routing = (routings != null && routings.size() > 0) ? routings.get(routings.size() - 1) : null;
+        noRoutingMatch(mex, routing);
+    }
+
+    public void noRoutingMatch(MyRoleMessageExchangeImpl mex, RoutingInfo routing) {
         if (!mex.isAsynchronous()) {
             mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
         } else {
-        	// enqueue message with the last message route, as per the comments in caller (@see BpelProcess.invokeProcess())
-        	RoutingInfo routing = 
-        		(routings != null && routings.size() > 0) ? 
-        				routings.get(routings.size() - 1) : null;
-        	if (routing != null) {
+            if (routing != null) {
                 if (__log.isDebugEnabled()) {
                     __log.debug("INPUTMSG: " + routing.correlator.getCorrelatorId() + ": SAVING to DB (no match) ");
                 }
 
-	            // send event
-	            CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
-	                    .getOperation().getName(), mex.getMessageExchangeId(), routing.wholeKeySet);
-	
-	            evt.setProcessId(_process.getProcessDAO().getProcessId());
-	            evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
-	            _process._debugger.onEvent(evt);
-	
-	            mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED);
-	
-	            // No match, means we add message exchange to the queue.
-	            routing.correlator.enqueueMessage(mex.getDAO(), routing.wholeKeySet);
+                // send event
+                CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
+                        .getOperation().getName(), mex.getMessageExchangeId(), routing.wholeKeySet);
+
+                evt.setProcessId(_process.getProcessDAO().getProcessId());
+                evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
+                _process._debugger.onEvent(evt);
+
+                mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED);
+
+                // No match, means we add message exchange to the queue.
+                routing.correlator.enqueueMessage(mex.getDAO(), routing.wholeKeySet);
 
                 //Second matcher needs to be registered here
                 JobDetails we = new JobDetails();
@@ -289,7 +303,7 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
                 }else{
                     _process._engine._contexts.scheduler.schedulePersistedJob(we, null);
                 }
-        	}
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
index 3e1ebd7..41d94f2 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
@@ -251,4 +251,10 @@ class ProcessDaoImpl extends DaoBaseImpl implements ProcessDAO {
             }
         }
     }
+
+    @Override
+    public Collection<ProcessInstanceDAO> findInstance(CorrelationKey cckey, short processInstanceState) {
+        // TODO Auto-generated method stub
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
index c4cbd6c..11aaf6b 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
@@ -50,9 +50,10 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
     /** filter for finding a matching selector. */
     private static final String LOCK_SELECTORS = "update from HCorrelatorSelector as hs set hs.lock = hs.lock+1 where hs.processType = :processType";
     private static final String CHECK_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
-    private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
+    //private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
+    private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.correlator = :correlator";
     private static final String FLTR_SELECTORS_SUBQUERY = ("from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlatorId = " +
-            "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId )").intern();
+            "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId)").intern();
 
     /** Query for removing routes. */
     private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?";
@@ -115,12 +116,12 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
         String hdr = "findRoute(keySet=" + keySet + "): ";
         if (__log.isDebugEnabled()) __log.debug(hdr);
 
-        String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString();
+        //String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString();
         List<CorrelationKeySet> subSets = keySet.findSubSets();
 
-        Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
-        q.setString("processType", processType);
-        q.setString("correlatorId", _hobj.getCorrelatorId());
+        //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
+        Query q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets));
+        q.setEntity("correlator", getHibernateObj());
 
         for( int i = 0; i < subSets.size(); i++ ) {
             q.setString("s" + i, subSets.get(i).toCanonicalString());

http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
index 278eb73..8d45610 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
@@ -35,6 +35,7 @@ import org.apache.ode.bpel.dao.CorrelatorDAO;
 import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.dao.ScopeDAO;
 import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
 import org.apache.ode.daohib.SessionManager;
 import org.apache.ode.daohib.bpel.hobj.HActivityRecovery;
@@ -54,6 +55,8 @@ import org.apache.ode.daohib.bpel.hobj.HProcessInstance;
 import org.apache.ode.daohib.bpel.hobj.HScope;
 import org.apache.ode.daohib.bpel.hobj.HVariableProperty;
 import org.apache.ode.daohib.bpel.hobj.HXmlData;
+import org.apache.ode.utils.stl.CollectionsX;
+import org.apache.ode.utils.stl.UnaryFunction;
 import org.hibernate.Criteria;
 import org.hibernate.Hibernate;
 import org.hibernate.Query;
@@ -134,11 +137,18 @@ public class ProcessDaoImpl extends HibernateDao implements ProcessDAO, Deferred
     @SuppressWarnings("unchecked")
     public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckeyValue) {
         entering("ProcessDaoImpl.findInstance");
-        Criteria criteria = getSession().createCriteria(HCorrelationSet.class);
-        criteria.add(Expression.eq("scope.instance.process.id",_process.getId()));
-        criteria.add(Expression.eq("value", ckeyValue.toCanonicalString()));
-        criteria.addOrder(Order.desc("scope.instance.created"));
-        return criteria.list();
+        Query qry = getSession().getNamedQuery(HCorrelationSet.SELECT_INSTANCES_BY_CORSETS);
+        qry.setParameter("ckey", ckeyValue.toCanonicalString());
+        Collection<HProcessInstance> resultList = qry.list();
+
+        ArrayList<ProcessInstanceDAO> ret = new ArrayList<ProcessInstanceDAO>();
+        CollectionsX.transform(ret, resultList, new UnaryFunction<HProcessInstance,ProcessInstanceDAO> () {
+          public ProcessInstanceDAO apply(HProcessInstance x) {
+            return new ProcessInstanceDaoImpl(_sm, x);
+          }
+         });
+
+        return ret;
     }
 
     /**
@@ -296,4 +306,23 @@ public class ProcessDaoImpl extends HibernateDao implements ProcessDAO, Deferred
     public String getGuid() {
         return _process.getGuid();
     }
+
+    @Override
+    public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, short processInstanceState) {
+        entering("ProcessDaoImpl.findInstance");
+        Query qry = getSession().getNamedQuery(HCorrelationSet.SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS);
+        qry.setParameter("ckey", ckey.toCanonicalString());
+        qry.setEntity("process", getHibernateObj());
+        qry.setShort("state", processInstanceState);
+        Collection<HProcessInstance> resultList = qry.list();
+
+        ArrayList<ProcessInstanceDAO> ret = new ArrayList<ProcessInstanceDAO>();
+        CollectionsX.transform(ret, resultList, new UnaryFunction<HProcessInstance,ProcessInstanceDAO> () {
+          public ProcessInstanceDAO apply(HProcessInstance x) {
+            return new ProcessInstanceDaoImpl(_sm, x);
+          }
+         });
+
+        return ret;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
index 1c0dba9..1656989 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
@@ -28,11 +28,16 @@ import java.util.Collection;
  * @hibernate.query name="SELECT_CORSET_IDS_BY_INSTANCES" query="select id from HCorrelationSet as c where c.instance in (:instances)"
  * @hibernate.query name="SELECT_CORSETS_BY_INSTANCES" query="from HCorrelationSet as c left join fetch c.properties where c.instance.id in (:instances)"
  * @hibernate.query name="SELECT_CORSETS_BY_PROCESS_STATES" query="from HCorrelationSet as c left join fetch c.process left join fetch c.instance where c.instance.state in (:states)"
+ * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS" query="select cs.scope.instance from HCorrelationSet as cs where cs.value = :ckey"
+ * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS" query="select cs.scope.instance from HCorrelationSet as cs where cs.value = :ckey and
+ *                          cs.process = :process and cs.instance.state = :state"
  */
 public class HCorrelationSet extends HObject{
     public static final String SELECT_CORSET_IDS_BY_INSTANCES = "SELECT_CORSET_IDS_BY_INSTANCES";
     public static final String SELECT_CORSETS_BY_INSTANCES = "SELECT_CORSETS_BY_INSTANCES";
     public static final String SELECT_CORSETS_BY_PROCESS_STATES = "SELECT_CORSETS_BY_PROCESS_STATES";
+    public static final String SELECT_INSTANCES_BY_CORSETS = "SELECT_INSTANCES_BY_CORSETS";
+    public static final String SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS = "SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS";
 
     private HProcess _process;
     private HProcessInstance _instance;

http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
----------------------------------------------------------------------
diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
index 84681b7..9f6fbce 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
@@ -36,7 +36,7 @@ import java.util.List;
 public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO {
     private static Logger __log = LoggerFactory.getLogger(CorrelatorDAOImpl.class);
     public final static String DELETE_CORRELATORS_BY_PROCESS = "DELETE_CORRELATORS_BY_PROCESS";
-    private final static String ROUTE_BY_CKEY_HEADER = "select route from MessageRouteDAOImpl as route where route._correlator._process._processType = :ptype and route._correlator._correlatorKey = :corrkey";
+    private final static String ROUTE_BY_CKEY_HEADER = "select route from MessageRouteDAOImpl as route where route._correlator = :corr ";
 
     @Id
     @Column(name = "CORRELATOR_ID")
@@ -77,6 +77,11 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO {
             MessageExchangeDAOImpl mex = itr.next();
             if (mex.getCorrelationKeySet().isRoutableTo(correlationKeySet, false)) {
                 itr.remove();
+                MessageExchangeDAOImpl mexImpl = (MessageExchangeDAOImpl) mex;
+                mexImpl.setCorrelationKeySet(null);
+                mexImpl.setCorrelator(null);
+                //getEM().remove(mex);
+                //getEM().flush();
                 return mex;
             }
         }
@@ -101,8 +106,7 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO {
         }
         List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
         Query qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
-        qry.setParameter("ptype", _process.getType().toString());
-        qry.setParameter("corrkey", _correlatorKey);
+        qry.setParameter("corr", this);
         for (int i = 0; i < subSets.size(); i++) {
             qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
         }

http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
----------------------------------------------------------------------
diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
index c0f35ff..e334520 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
@@ -323,7 +323,7 @@ public class MessageExchangeDAOImpl extends OpenJPADAO implements MessageExchang
     }
 
     void setCorrelationKeySet(CorrelationKeySet correlationKeySet) {
-        _correlationKeys = correlationKeySet.toCanonicalString();
+        _correlationKeys = correlationKeySet != null ? correlationKeySet.toCanonicalString() : null;
     }
 
     CorrelationKeySet getCorrelationKeySet() {

http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
----------------------------------------------------------------------
diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
index 2b22d08..c109fda 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
@@ -44,7 +44,9 @@ import java.util.List;
 @NamedQueries({
     @NamedQuery(name="ActiveInstances", query="select i from ProcessInstanceDAOImpl as i where i._process = :process and i._state = :state"),
     @NamedQuery(name="InstanceByCKey", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey"),
-    @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl as c where c._correlatorKey = :ckey and c._process = :process")
+    @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl as c where c._correlatorKey = :ckey and c._process = :process"),
+    @NamedQuery(name="InstanceByCKeyProcessState", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey and "
+            + "cs._scope._processInstance._process = :process and cs._scope._processInstance._state = :state")
 })
 public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO {
     private static final Logger __log = LoggerFactory.getLogger(ProcessDAOImpl.class);
@@ -226,4 +228,14 @@ public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO {
         qry.setParameter("state", ProcessState.STATE_ACTIVE);
         return qry.getResultList();
     }
+
+    @Override
+    public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, short processInstanceState) {
+        //need to make this query more efficient
+        Query qry = getEM().createNamedQuery("InstanceByCKeyProcessState");
+        qry.setParameter("ckey", ckey.toCanonicalString());
+        qry.setParameter("process", this);
+        qry.setParameter("state", ProcessState.STATE_ACTIVE);
+        return qry.getResultList();
+    }
 }