You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2016/03/05 07:42:34 UTC
ode git commit: ODE-1049: Optimised findRoute and dequeueMessage
queries to use pre inistialized CorrelationKeySet cannonical value.
Repository: ode
Updated Branches:
refs/heads/ode-1.3.x 0342b4031 -> 58b1a735d
ODE-1049: Optimised findRoute and dequeueMessage queries to use pre inistialized CorrelationKeySet cannonical value.
Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/58b1a735
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/58b1a735
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/58b1a735
Branch: refs/heads/ode-1.3.x
Commit: 58b1a735ddb58c381e7ef356e8c70f3f6f71aa93
Parents: 0342b40
Author: sathwik <sa...@apache.org>
Authored: Sat Mar 5 12:12:04 2016 +0530
Committer: sathwik <sa...@apache.org>
Committed: Sat Mar 5 12:12:04 2016 +0530
----------------------------------------------------------------------
.../org/apache/ode/bpel/dao/CorrelatorDAO.java | 15 +++
.../ode/bpel/engine/BpelRuntimeContextImpl.java | 14 ++-
.../ode/bpel/memdao/CorrelatorDaoImpl.java | 50 ++++-----
.../ode/daohib/bpel/CorrelatorDaoImpl.java | 103 ++++++++++---------
.../apache/ode/dao/jpa/CorrelatorDAOImpl.java | 78 +++++++-------
5 files changed, 147 insertions(+), 113 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
----------------------------------------------------------------------
diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
index 7a2556c..21cd8eb 100644
--- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
+++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/CorrelatorDAO.java
@@ -83,9 +83,11 @@ public interface CorrelatorDAO {
Collection<CorrelatorMessageDAO> getAllMessages();
/**
+ * @deprecated
* Find a route matching the given correlation key.
* @param correlationKey correlation key
* @return route matching the given correlation key
+ * @see findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized)
*/
List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet);
@@ -115,4 +117,17 @@ public interface CorrelatorDAO {
* @return all routes registered on this correlator, use with care as it can potentially return a lot of values
*/
Collection<MessageRouteDAO> getAllRoutes();
+
+ /**
+ * Find a route matching the given correlation key set.
+ * If the correlationKeySet is known to be pre initialized then isCorrleationKeySetPreInitialized can be set to true or false otherwise.
+ * Depending on the value of isCorrleationKeySetPreInitialized,
+ * true - canonical value of the correlationKeySet might be used to find the route.
+ * false - canonical value of each of the subset of correlationKeySet might be used to find the route.
+ *
+ * @param correlationKeySet correlation key
+ * @param isCorrleationKeySetPreInitialized
+ * @return route matching the given correlation key
+ */
+ List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized);
}
http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
index aff7733..480094e 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
@@ -1482,11 +1482,13 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext {
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckeySet=" + ckeySet);
}
+
CorrelatorDAO correlator = _dao.getProcess().getCorrelator(correlatorId);
+
// Find the route first, this is a SELECT FOR UPDATE on the "selector" row,
// So we want to acquire the lock before we do anthing else.
- List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet);
+ List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet,true);
if (mroutes == null || mroutes.size() == 0) {
// Ok, this means that a message arrived before we did, so nothing to do.
__log.debug("MatcherEvent handling: nothing to do, route no longer in DB");
@@ -1495,13 +1497,15 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext {
// Now see if there is a message that matches this selector.
MessageExchangeDAO mexdao = correlator.dequeueMessage(ckeySet);
+
if (mexdao != null) {
__log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)");
- if( MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) {
- __log.warn("A message arrived before a receive is ready for a request/response pattern. This may be processed to success. However, you should consider revising your process since a TCP port and a container thread will be held for a longer time and the process will not scale under heavy load.");
- }
-
+ if( MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) {
+ __log.warn("A message arrived before a receive is ready for a request/response pattern. This may be processed to success. However, you should consider revising your process since a TCP port and a container thread will be held for a longer time and the process will not scale under heavy load.");
+ }
+
for (MessageRouteDAO mroute : mroutes) {
+ __log.debug("Removing routes for GroupID: {} Instance: {}",mroute.getGroupId(),_dao.getInstanceId());
// We have a match, so we can get rid of the routing entries.
correlator.removeRoutes(mroute.getGroupId(), _dao);
}
http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
index 5de76f2..f6261d4 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/CorrelatorDaoImpl.java
@@ -66,30 +66,7 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO {
}
public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) {
- List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
-
- assert keySet != null;
-
- if (__log.isDebugEnabled()) {
- __log.debug("findRoute: keySet=" + keySet);
- }
- boolean routed = false;
- for (MessageRouteDaoImpl route : _routes) {
- assert route._ckeySet != null;
-
- if(keySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) {
- if ("all".equals(route.getRoute())) {
- routes.add(route);
- } else {
- if (!routed) {
- routes.add(route);
- }
- routed = true;
- }
- }
- }
-
- return routes;
+ return findRoute(keySet, false);
}
public String getCorrelatorId() {
@@ -186,4 +163,29 @@ class CorrelatorDaoImpl extends DaoBaseImpl implements CorrelatorDAO {
return true;
}
+ public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) {
+ List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+
+ assert correlationKeySet != null;
+ boolean routed = false;
+
+ __log.debug("findRoute: keySet={}",correlationKeySet);
+
+ for (MessageRouteDaoImpl route : _routes) {
+ assert route._ckeySet != null;
+
+ if(correlationKeySet.isRoutableTo(route._ckeySet, "all".equals(route.getRoute()))) {
+ if ("all".equals(route.getRoute())) {
+ routes.add(route);
+ } else {
+ if (!routed) {
+ routes.add(route);
+ }
+ routed = true;
+ }
+ }
+ }
+
+ return routes;
+ }
}
http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
index 3f530cf..a467c2d 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
@@ -56,7 +56,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
"(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId)").intern();
/** Query for removing routes. */
- private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?";
+ private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where instance = ? and groupId = ?";
private HCorrelator _hobj;
@@ -75,12 +75,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
__log.debug("dequeueMessage({}): ",keySet);
- List<CorrelationKeySet> subSets = keySet.findSubSets();
- Query qry = getSession().createFilter(_hobj.getMessageCorrelations(),
- generateUnmatchedQuery(subSets));
- for( int i = 0; i < subSets.size(); i++ ) {
- qry.setString("s" + i, subSets.get(i).toCanonicalString());
- }
+ Query qry = getSession().createFilter(_hobj.getMessageCorrelations()," where this.correlationKey = :s0");
+ qry.setString("s0", keySet.toCanonicalString());
// We really should consider the possibility of multiple messages matching a criteria.
// When the message is handled, its not too convenient to attempt to determine if the
@@ -120,46 +116,7 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
@SuppressWarnings("unchecked")
public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet) {
- List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
-
- entering("CorrelatorDaoImpl.findRoute");
- String hdr = "findRoute(keySet=" + keySet + "): ";
- if (__log.isDebugEnabled()) __log.debug(hdr);
-
- //String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString();
- List<CorrelationKeySet> subSets = keySet.findSubSets();
-
- //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
- Query q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets));
- q.setEntity("correlator", getHibernateObj());
-
- for( int i = 0; i < subSets.size(); i++ ) {
- q.setString("s" + i, subSets.get(i).toCanonicalString());
- }
- // Make sure we obtain a lock for the selector we want to find.
- q.setLockMode("hs", LockMode.UPGRADE);
-
- List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
- List<HCorrelatorSelector> list;
- try {
- list = (List<HCorrelatorSelector>) q.list();
- } catch (LockAcquisitionException e) {
- throw new Scheduler.JobProcessorException(e, true);
- }
- for (HCorrelatorSelector selector : list) {
- if (selector != null) {
- boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute());
- if ("all".equals(selector.getRoute()) ||
- (isRoutePolicyOne && !targets.contains(selector.getInstance()))) {
- routes.add(new MessageRouteDaoImpl(_sm, selector));
- targets.add(selector.getInstance());
- }
- }
- }
-
- if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes);
-
- return routes;
+ return findRoute(keySet,false);
}
private String generateUnmatchedQuery(List<CorrelationKeySet> subSets) {
@@ -271,8 +228,8 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
__log.debug(hdr);
Session session = getSession();
Query q = session.createQuery(QRY_DELSELECTORS);
- q.setString(0, routeGroupId); // groupId
- q.setEntity(1, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance
+ q.setEntity(0, ((ProcessInstanceDaoImpl) target).getHibernateObj()); // instance
+ q.setString(1, routeGroupId); // groupId
int updates = q.executeUpdate();
session.flush(); // explicit flush to ensure route removed
if (__log.isDebugEnabled())
@@ -293,4 +250,52 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
return routes;
}
+ public List<MessageRouteDAO> findRoute(CorrelationKeySet keySet,boolean isCorrleationKeySetPreInitialized) {
+ List<MessageRouteDAO> routes = new ArrayList<MessageRouteDAO>();
+
+ entering("CorrelatorDaoImpl.findRoute");
+ __log.debug("findRoute(keySet={})",keySet);
+
+ //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
+ Query q = null;
+ if(isCorrleationKeySetPreInitialized){
+ q = getSession().createQuery(FLTR_SELECTORS + " and hs.correlationKey = :s0");
+ q.setEntity("correlator", getHibernateObj());
+ q.setString("s0", keySet.toCanonicalString());
+ } else {
+ List<CorrelationKeySet> subSets = keySet.findSubSets();
+ q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets));
+ q.setEntity("correlator", getHibernateObj());
+
+ for( int i = 0; i < subSets.size(); i++ ) {
+ q.setString("s" + i, subSets.get(i).toCanonicalString());
+ }
+ }
+ // Make sure we obtain a lock for the selector we want to find.
+ q.setLockMode("hs", LockMode.UPGRADE);
+
+ List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
+ List<HCorrelatorSelector> list;
+ try {
+ list = (List<HCorrelatorSelector>) q.list();
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
+ for (HCorrelatorSelector selector : list) {
+ __log.debug("selector returned form findRoute {} and targets {}", selector.getInstance().getId(),targets);
+ if (selector != null) {
+ boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute());
+ if ("all".equals(selector.getRoute()) ||
+ (isRoutePolicyOne && !targets.contains(selector.getInstance()))) {
+ __log.debug("selector added for targets {}", selector.getInstance().getId());
+ routes.add(new MessageRouteDaoImpl(_sm, selector));
+ targets.add(selector.getInstance());
+ }
+ }
+ }
+
+ __log.debug("findRoute(keySet={}) found {}",keySet,routes);
+
+ return routes;
+ }
}
http://git-wip-us.apache.org/repos/asf/ode/blob/58b1a735/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
----------------------------------------------------------------------
diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
index 9f6fbce..514fda8 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
@@ -101,41 +101,7 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO {
@SuppressWarnings("unchecked")
public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet) {
- if (__log.isDebugEnabled()) {
- __log.debug("findRoute " + correlationKeySet);
- }
- List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
- Query qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
- qry.setParameter("corr", this);
- for (int i = 0; i < subSets.size(); i++) {
- qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
- }
-
- List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList();
- if (candidateRoutes.size() > 0) {
- List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>();
- boolean routed = false;
- for (int i = 0; i < candidateRoutes.size(); i++) {
- MessageRouteDAO route = candidateRoutes.get(i);
- if ("all".equals(route.getRoute())) {
- matchingRoutes.add(route);
- } else {
- if (!routed) {
- matchingRoutes.add(route);
- }
- routed = true;
- }
- }
- if (__log.isDebugEnabled()) {
- __log.debug("findRoute found " + matchingRoutes);
- }
- return matchingRoutes;
- } else {
- if (__log.isDebugEnabled()) {
- __log.debug("findRoute found nothing");
- }
- return null;
- }
+ return findRoute(correlationKeySet, false);
}
private String generateSelectorQuery(String header, List<CorrelationKeySet> subSets) {
@@ -199,4 +165,46 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO {
// TODO Auto-generated method stub
return true;
}
+
+ public List<MessageRouteDAO> findRoute(CorrelationKeySet correlationKeySet,boolean isCorrleationKeySetPreInitialized) {
+ __log.debug("findRoute {}", correlationKeySet);
+
+ Query qry = null;
+
+ if(isCorrleationKeySetPreInitialized){
+ qry = getEM().createQuery(ROUTE_BY_CKEY_HEADER + " and route._correlationKey = :s0");
+ qry.setParameter("corr", this);
+ qry.setParameter("s0", correlationKeySet.toCanonicalString());
+ } else {
+ List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
+ qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
+ qry.setParameter("corr", this);
+ for (int i = 0; i < subSets.size(); i++) {
+ qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
+ }
+ }
+
+ List<MessageRouteDAO> candidateRoutes = (List<MessageRouteDAO>) qry.getResultList();
+ if (candidateRoutes.size() > 0) {
+ List<MessageRouteDAO> matchingRoutes = new ArrayList<MessageRouteDAO>();
+ boolean routed = false;
+ for (int i = 0; i < candidateRoutes.size(); i++) {
+ MessageRouteDAO route = candidateRoutes.get(i);
+ if ("all".equals(route.getRoute())) {
+ matchingRoutes.add(route);
+ } else {
+ if (!routed) {
+ matchingRoutes.add(route);
+ }
+ routed = true;
+ }
+ }
+
+ __log.debug("findRoute found {}",matchingRoutes);
+ return matchingRoutes;
+ } else {
+ __log.debug("findRoute found nothing");
+ return null;
+ }
+ }
}