You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2016/03/05 07:42:34 UTC

ode git commit: ODE-1049: Optimised findRoute and dequeueMessage queries to use pre inistialized CorrelationKeySet cannonical value.

Repository: ode
Updated Branches:
  refs/heads/ode-1.3.x 0342b4031 -> 58b1a735d


ODE-1049: Optimised findRoute and dequeueMessage queries to use pre inistialized CorrelationKeySet cannonical value.


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/58b1a735
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/58b1a735
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/58b1a735

Branch: refs/heads/ode-1.3.x
Commit: 58b1a735ddb58c381e7ef356e8c70f3f6f71aa93
Parents: 0342b40
Author: sathwik <sa...@apache.org>
Authored: Sat Mar 5 12:12:04 2016 +0530
Committer: sathwik <sa...@apache.org>
Committed: Sat Mar 5 12:12:04 2016 +0530

----------------------------------------------------------------------
 .../org/apache/ode/bpel/dao/CorrelatorDAO.java  |  15 +++
 .../ode/bpel/engine/BpelRuntimeContextImpl.java |  14 ++-
 .../ode/bpel/memdao/CorrelatorDaoImpl.java      |  50 ++++-----
 .../ode/daohib/bpel/CorrelatorDaoImpl.java      | 103 ++++++++++---------
 .../apache/ode/dao/jpa/CorrelatorDAOImpl.java   |  78 +++++++-------
 5 files changed, 147 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
----------------------------------------------------------------------
diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
index 7a2556c..21cd8eb 100644
--- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
+++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
@@ -83,9 +83,11 @@ public interface CorrelatorDAO {
     Collection<CorrelatorMessageDAO> getAllMessages();
 
   /**
+   * @deprecated
    * Find a route matching the given correlation key.
    * @param correlationKey correlation key
    * @return route matching the given correlation key
+   * @see findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized)
    */
   List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet);
 
@@ -115,4 +117,17 @@ public interface CorrelatorDAO {
      * @return all routes registered on this correlator, use with care as it can potentially return a lot of values
      */
     Collection<MessageRouteDAO> getAllRoutes();
+
+    /**
+     * Find a route matching the given correlation key set.
+     * If the correlationKeySet is known to be pre initialized then isCorrleationKeySetPreInitialized can be set to true or false otherwise.
+     * Depending on the value of isCorrleationKeySetPreInitialized,
+     *  true -  canonical value of the correlationKeySet might be used to find the route.
+     *  false - canonical value of each of the subset of correlationKeySet might be used to find the route.
+     *
+     * @param correlationKeySet correlation key
+     * @param isCorrleationKeySetPreInitialized
+     * @return route matching the given correlation key
+     */
+    List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized);
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
index aff7733..480094e 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
@@ -1482,11 +1482,13 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext {
         if (BpelProcess.__log.isDebugEnabled()) {
             __log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckeySet=" + ckeySet);
         }
+
         CorrelatorDAO correlator = _dao.getProcess().getCorrelator(correlatorId);
 
+
         // Find the route first, this is a SELECT FOR UPDATE on the "selector" row,
         // So we want to acquire the lock before we do anthing else.
-        List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet);
+        List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet,true);
         if (mroutes == null || mroutes.size() == 0) {
             // Ok, this means that a message arrived before we did, so nothing to do.
             __log.debug("MatcherEvent handling: nothing to do, route no longer in DB");
@@ -1495,13 +1497,15 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext {
 
         // Now see if there is a message that matches this selector.
         MessageExchangeDAO mexdao = correlator.dequeueMessage(ckeySet);
+
         if (mexdao != null) {
             __log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)");
-        	if( MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) {
-        		__log.warn("A message arrived before a receive is ready for a request/response pattern. This may be processed to success. However, you should consider revising your process since a TCP port and a container thread will be held for a longer time and the process will not scale under heavy load.");
-        	}
-        	
+            if( MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) {
+                __log.warn("A message arrived before a receive is ready for a request/response pattern. This may be processed to success. However, you should consider revising your process since a TCP port and a container thread will be held for a longer time and the process will not scale under heavy load.");
+            }
+
             for (MessageRouteDAO mroute : mroutes) {
+                __log.debug("Removing routes for GroupID: {} Instance: {}",mroute.getGroupId(),_dao.getInstanceId());
                 // We have a match, so we can get rid of the routing entries.
                 correlator.removeRoutes(mroute.getGroupId(), _dao);
             }

http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
index 5de76f2..f6261d4 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
@@ -66,30 +66,7 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO {
     }
 
     public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) {
-        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
-
-        assert keySet != null;
-
-        if (__log.isDebugEnabled()) {
-            __log.debug("findRoute: keySet=" + keySet);
-        }
-        boolean routed = false;
-        for (MessageRouteDaoImpl route : _routes) {
-            assert route._ckeySet != null;
-            
-            if(keySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) {
-            	if ("all".equals(route.getRoute()))  {
-            		routes.add(route);
-            	} else {
-            		if (!routed) {
-            			routes.add(route);
-            		}
-            		routed = true;
-            	}
-            }
-        }
-
-        return routes;
+        return findRoute(keySet, false);
     }
 
     public String getCorrelatorId() {
@@ -186,4 +163,29 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO {
         return true;
     }
 
+    public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) {
+        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+
+        assert correlationKeySet != null;
+        boolean routed = false;
+
+        __log.debug("findRoute: keySet={}",correlationKeySet);
+
+        for (MessageRouteDaoImpl route : _routes) {
+            assert route._ckeySet != null;
+
+            if(correlationKeySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) {
+                if ("all".equals(route.getRoute()))  {
+                    routes.add(route);
+                } else {
+                    if (!routed) {
+                        routes.add(route);
+                    }
+                    routed = true;
+                }
+            }
+        }
+
+        return routes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
index 3f530cf..a467c2d 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
@@ -56,7 +56,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
             "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId)").intern();
 
     /** Query for removing routes. */
-    private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?";
+    private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where instance = ? and groupId = ?";
 
     private HCorrelator _hobj;
 
@@ -75,12 +75,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
 
         __log.debug("dequeueMessage({}): ",keySet);
 
-        List<CorrelationKeySet> subSets = keySet.findSubSets();
-        Query qry = getSession().createFilter(_hobj.getMessageCorrelations(),
-                generateUnmatchedQuery(subSets));
-        for( int i = 0; i < subSets.size(); i++ ) {
-            qry.setString("s" + i, subSets.get(i).toCanonicalString());
-        }
+        Query qry = getSession().createFilter(_hobj.getMessageCorrelations()," where this.correlationKey = :s0");
+        qry.setString("s0", keySet.toCanonicalString());
 
         // We really should consider the possibility of multiple messages matching a criteria.
         // When the message is handled, its not too convenient to attempt to determine if the
@@ -120,46 +116,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
 
     @SuppressWarnings("unchecked")
     public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) {
-        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
-
-        entering("CorrelatorDaoImpl.findRoute");
-        String hdr = "findRoute(keySet=" + keySet + "): ";
-        if (__log.isDebugEnabled()) __log.debug(hdr);
-
-        //String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString();
-        List<CorrelationKeySet> subSets = keySet.findSubSets();
-
-        //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
-        Query q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets));
-        q.setEntity("correlator", getHibernateObj());
-
-        for( int i = 0; i < subSets.size(); i++ ) {
-            q.setString("s" + i, subSets.get(i).toCanonicalString());
-        }
-        // Make sure we obtain a lock for the selector we want to find.
-        q.setLockMode("hs", LockMode.UPGRADE);
-
-        List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
-        List<HCorrelatorSelector> list;
-        try {
-            list = (List<HCorrelatorSelector>) q.list();
-        } catch (LockAcquisitionException e) {
-            throw new Scheduler.JobProcessorException(e, true);
-        }
-        for (HCorrelatorSelector selector : list) {
-            if (selector != null) {
-                boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute());
-                if ("all".equals(selector.getRoute()) ||
-                        (isRoutePolicyOne && !targets.contains(selector.getInstance()))) {
-                    routes.add(new MessageRouteDaoImpl(_sm, selector));
-                    targets.add(selector.getInstance());
-                }
-            }
-        }
-
-        if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes);
-
-        return routes;
+        return findRoute(keySet,false);
     }
 
     private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) {
@@ -271,8 +228,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
         __log.debug(hdr);
         Session session = getSession();
         Query q = session.createQuery(QRY_DELSELECTORS);
-        q.setString(0, routeGroupId); // groupId
-        q.setEntity(1, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance
+        q.setEntity(0, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance
+        q.setString(1, routeGroupId); // groupId
         int updates = q.executeUpdate();
         session.flush(); // explicit flush to ensure route removed
         if (__log.isDebugEnabled())
@@ -293,4 +250,52 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
         return routes;
     }
 
+    public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet,boolean isCorrleationKeySetPreInitialized) {
+        List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+
+        entering("CorrelatorDaoImpl.findRoute");
+        __log.debug("findRoute(keySet={})",keySet);
+
+        //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
+        Query q = null;
+        if(isCorrleationKeySetPreInitialized){
+            q = getSession().createQuery(FLTR_SELECTORS + " and hs.correlationKey = :s0");
+            q.setEntity("correlator", getHibernateObj());
+            q.setString("s0", keySet.toCanonicalString());
+        } else {
+            List<CorrelationKeySet> subSets = keySet.findSubSets();
+            q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets));
+            q.setEntity("correlator", getHibernateObj());
+
+            for( int i = 0; i < subSets.size(); i++ ) {
+                q.setString("s" + i, subSets.get(i).toCanonicalString());
+            }
+        }
+        // Make sure we obtain a lock for the selector we want to find.
+        q.setLockMode("hs", LockMode.UPGRADE);
+
+        List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
+        List<HCorrelatorSelector> list;
+        try {
+            list = (List<HCorrelatorSelector>) q.list();
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
+        for (HCorrelatorSelector selector : list) {
+            __log.debug("selector returned form findRoute {} and targets {}", selector.getInstance().getId(),targets);
+            if (selector != null) {
+                boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute());
+                if ("all".equals(selector.getRoute()) ||
+                        (isRoutePolicyOne && !targets.contains(selector.getInstance()))) {
+                    __log.debug("selector added for targets {}", selector.getInstance().getId());
+                    routes.add(new MessageRouteDaoImpl(_sm, selector));
+                    targets.add(selector.getInstance());
+                }
+            }
+        }
+
+        __log.debug("findRoute(keySet={}) found {}",keySet,routes);
+
+        return routes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
----------------------------------------------------------------------
diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
index 9f6fbce..514fda8 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
@@ -101,41 +101,7 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO {
 
     @SuppressWarnings("unchecked")
     public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet) {
-        if (__log.isDebugEnabled()) {
-            __log.debug("findRoute " + correlationKeySet);
-        }
-        List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
-        Query qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
-        qry.setParameter("corr", this);
-        for (int i = 0; i < subSets.size(); i++) {
-            qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
-        }
-
-        List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList();
-        if (candidateRoutes.size() > 0) {
-            List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>();
-            boolean routed = false;
-            for (int i = 0; i < candidateRoutes.size(); i++) {
-                MessageRouteDAO route = candidateRoutes.get(i);
-                if ("all".equals(route.getRoute())) {
-                    matchingRoutes.add(route);
-                } else {
-                    if (!routed) {
-                        matchingRoutes.add(route);
-                    }
-                    routed = true;
-                }
-            }
-            if (__log.isDebugEnabled()) {
-                __log.debug("findRoute found " + matchingRoutes);
-            }
-            return matchingRoutes;
-        } else {
-            if (__log.isDebugEnabled()) {
-                __log.debug("findRoute found nothing");
-            }
-            return null;
-        }
+        return findRoute(correlationKeySet, false);
     }
 
     private String generateSelectorQuery(String header, List<CorrelationKeySet> subSets) {
@@ -199,4 +165,46 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO {
         // TODO Auto-generated method stub
         return true;
     }
+
+    public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) {
+        __log.debug("findRoute {}", correlationKeySet);
+
+        Query qry = null;
+
+        if(isCorrleationKeySetPreInitialized){
+            qry = getEM().createQuery(ROUTE_BY_CKEY_HEADER + " and route._correlationKey = :s0");
+            qry.setParameter("corr", this);
+            qry.setParameter("s0", correlationKeySet.toCanonicalString());
+        } else {
+            List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
+            qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
+            qry.setParameter("corr", this);
+            for (int i = 0; i < subSets.size(); i++) {
+                qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
+            }
+        }
+
+        List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList();
+        if (candidateRoutes.size() > 0) {
+            List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>();
+            boolean routed = false;
+            for (int i = 0; i < candidateRoutes.size(); i++) {
+                MessageRouteDAO route = candidateRoutes.get(i);
+                if ("all".equals(route.getRoute())) {
+                    matchingRoutes.add(route);
+                } else {
+                    if (!routed) {
+                        matchingRoutes.add(route);
+                    }
+                    routed = true;
+                }
+            }
+
+            __log.debug("findRoute found {}",matchingRoutes);
+            return matchingRoutes;
+        } else {
+            __log.debug("findRoute found nothing");
+            return null;
+        }
+    }
 }