You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2016/01/20 12:28:57 UTC
ode git commit: ODE-974: Routing logic for messages arriving early
has been fixed
Repository: ode
Updated Branches:
refs/heads/ode-1.3.x ba61210b4 -> 501f70f20
ODE-974: Routing logic for messages arriving early has been fixed
Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/501f70f2
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/501f70f2
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/501f70f2
Branch: refs/heads/ode-1.3.x
Commit: 501f70f200d02d82fc0607ccd5d7d8b76b000b6e
Parents: ba61210
Author: sathwik <sa...@apache.org>
Authored: Wed Jan 20 16:56:36 2016 +0530
Committer: sathwik <sa...@apache.org>
Committed: Wed Jan 20 16:56:36 2016 +0530
----------------------------------------------------------------------
.../org/apache/ode/bpel/dao/ProcessDAO.java | 10 ++
.../apache/ode/bpel/engine/BpelEngineImpl.java | 14 +-
.../org/apache/ode/bpel/engine/BpelProcess.java | 141 ++++++++++++++-----
.../ode/bpel/engine/BpelRuntimeContextImpl.java | 5 +
.../ode/bpel/engine/PartnerLinkMyRoleImpl.java | 74 ++++++----
.../apache/ode/bpel/memdao/ProcessDaoImpl.java | 6 +
.../ode/daohib/bpel/CorrelatorDaoImpl.java | 13 +-
.../apache/ode/daohib/bpel/ProcessDaoImpl.java | 39 ++++-
.../ode/daohib/bpel/hobj/HCorrelationSet.java | 5 +
.../apache/ode/dao/jpa/CorrelatorDAOImpl.java | 10 +-
.../ode/dao/jpa/MessageExchangeDAOImpl.java | 2 +-
.../org/apache/ode/dao/jpa/ProcessDAOImpl.java | 14 +-
12 files changed, 251 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
----------------------------------------------------------------------
diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
index 97c252b..c68ac53 100644
--- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
+++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
@@ -118,4 +118,14 @@ public interface ProcessDAO {
* @return all instances that haven't completed, use with care as there could be a lot of them
*/
Collection<ProcessInstanceDAO> getActiveInstances();
+
+ /**
+ * Locates process instances for a specific process version that matches correlation key and instance state
+ * @param ckey
+ * Correlation key
+ * @param processInstanceState
+ * Instance state org.apache.ode.bpel.common.ProcessState
+ * @return collection of {@link ProcessInstanceDAO} that match correlation key, instance state
+ */
+ Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey,short processInstanceState);
}
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
index 6a1edce..03efdd6 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
@@ -483,15 +483,25 @@ public class BpelEngineImpl implements BpelEngine {
if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) {
List<BpelProcess> processes = getAllProcesses(we.getProcessId());
boolean routed = false;
+
+ //try to find the target process and execute
jobInfo.jobDetail.detailsExt.put("enqueue", false);
for(BpelProcess proc : processes) {
- routed = routed || proc.handleJobDetails(jobInfo.jobDetail);
+ routed = proc.handleJobDetails(jobInfo.jobDetail);
+
+ if(routed) break;
}
+ //no target process was identified, enqueue the mex for later processing
if(!routed && we.getType() == JobType.INVOKE_INTERNAL) {
jobInfo.jobDetail.detailsExt.put("enqueue", true);
- process.handleJobDetails(jobInfo.jobDetail);
+
+ for(BpelProcess proc : processes) {
+ routed = proc.handleJobDetails(jobInfo.jobDetail);
+
+ if(routed) break;
+ }
}
}
else {
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
index b5cef27..1a7ca92 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
@@ -21,9 +21,11 @@ package org.apache.ode.bpel.engine;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -36,12 +38,15 @@ import javax.xml.namespace.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.agents.memory.SizingAgent;
+import org.apache.ode.bpel.common.CorrelationKey;
+import org.apache.ode.bpel.common.CorrelationKeySet;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.common.ProcessState;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.engine.PartnerLinkMyRoleImpl.RoutingInfo;
import org.apache.ode.bpel.engine.extvar.ExternalVariableConf;
import org.apache.ode.bpel.engine.extvar.ExternalVariableManager;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
@@ -51,6 +56,7 @@ import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.Scheduler;
@@ -217,38 +223,17 @@ public class BpelProcess {
return false;
}
- mex.getDAO().setProcess(getProcessDAO());
-
- if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
- __log.debug("Aborting processing of mex " + mex + " due to interceptors.");
- return false;
- }
-
- markused();
-
- // Ideally, if Java supported closure, the routing code would return null or the appropriate
- // closure to handle the route.
- List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null;
- for (PartnerLinkMyRoleImpl target : targets) {
- routings = target.findRoute(mex);
- boolean createInstance = target.isCreateInstance(mex);
-
- if (mex.getStatus() != MessageExchange.Status.FAILURE && routings!=null) {
- for (PartnerLinkMyRoleImpl.RoutingInfo routing : routings) {
- routed = routed || invokeHandler.invoke(target, routing, createInstance);
- }
- }
- if (routed) {
- break;
- }
+ //Actual identification of the routes and invocation of the target process will happen when enqueue is disabled.
+ //This is the main conditional block of code that does the actual work.
+ //Its only after running this block with enqueue disabled, it can be identified that mex was not routable.
+ //There is a separate logic following this 'if' condition to handle the mex when enqueue is enabled.
+ if(!enqueue) {
+ routed = findRouteAndInvoke(targets, mex, invokeHandler);
}
// Nothing found, saving for later
- if (!routed && enqueue) {
- // TODO this is kind of hackish when no match and more than one myrole is selected.
- // we save the routing on the last myrole
- // actually the message queue should be attached to the instance instead of the correlator
- targets.get(targets.size()-1).noRoutingMatch(mex, routings);
+ if (enqueue && !routed) {
+ routed = noRoutingMatch(targets, mex);
} else {
// Now we have to update our message exchange status. If the <reply> was not hit during the
// invocation, then we will be in the "REQUEST" phase which means that either this was a one-way
@@ -263,12 +248,100 @@ public class BpelProcess {
_hydrationLatch.release(1);
}
- // For a one way, once the engine is done, the mex can be safely released.
- // Sean: not really, if route is not found, we cannot delete the mex yet
-// if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY) && routed && getCleanupCategories(false).contains(CLEANUP_CATEGORY.MESSAGES)) {
-// mex.release();
-// }
+ return routed;
+ }
+
+ //this method should be invoked within the ambit of _hydrationLatch
+ private boolean findRouteAndInvoke(List<PartnerLinkMyRoleImpl> targets, MyRoleMessageExchangeImpl mex, InvokeHandler invokeHandler) {
+ boolean routed = false;
+
+ //if mex is already queued then disallow overriding of process on the mex
+ if(!MyRoleMessageExchange.CorrelationStatus.QUEUED.equals(mex.getCorrelationStatus()))
+ mex.getDAO().setProcess(getProcessDAO());
+
+ if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
+ __log.debug("Aborting processing of mex " + mex + " due to interceptors.");
+ return false;
+ }
+
+ markused();
+
+ // Ideally, if Java supported closure, the routing code would return null or the appropriate
+ // closure to handle the route.
+ List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null;
+ for (PartnerLinkMyRoleImpl target : targets) {
+ routings = target.findRoute(mex);
+ boolean createInstance = target.isCreateInstance(mex);
+
+ if (mex.getStatus() != MessageExchange.Status.FAILURE && routings!=null) {
+ for (PartnerLinkMyRoleImpl.RoutingInfo routing : routings) {
+ routed = routed || invokeHandler.invoke(target, routing, createInstance);
+ }
+ }
+ if (routed) {
+ break;
+ }
+ }
+ return routed;
+ }
+
+ //this method should be invoked within the ambit of _hydrationLatch
+ private boolean noRoutingMatch(List<PartnerLinkMyRoleImpl> targets,MyRoleMessageExchangeImpl mex) {
+ boolean routed = false;
+ boolean enqueue = true;
+ List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null;
+
+ /* try to find the active instance that is waiting on the correlated value and then associate the corresponding
+ * processDAO,correlator on the mex and enqueue for later processing.
+ */
+
+ // Need to iterate over the partnerlinks to identify the partnerlink that has the operation defined in the mex.
+ for (Iterator<PartnerLinkMyRoleImpl> targetItr = targets.iterator();
+ (targetItr.hasNext() && !routed);) {
+
+ PartnerLinkMyRoleImpl target = targetItr.next();
+ routings = target.findRoute(mex,enqueue);
+
+ //routings will be null if the mex operation is no defined in this myRole, iterate over next myRole.
+ if (routings != null) {
+ RoutingInfo routing = routings.get(routings.size()-1);
+
+ if (routing != null) {
+
+ for( Iterator<CorrelationKeySet> aSubSetItr = routing.wholeKeySet.findSubSets().iterator();
+ (aSubSetItr.hasNext() && !routed);) {
+ CorrelationKeySet aSubSet = aSubSetItr.next();
+
+ for(Iterator<CorrelationKey> keyItr = aSubSet.iterator();
+ (keyItr.hasNext() && !routed);) {
+
+ CorrelationKey key = keyItr.next();
+ __log.info("noRoutingMatch: Finding active instance correlated with {} and process pid {}",new Object[]{key,_pid});
+
+ // Assumption is, process instance is uniquely identifiable by any single initiated correlation key across multiple versions
+ // of a same process type.
+
+ // We need to make sure the PID of process of the instance is same as that of the
+ // partnerlink's associated process in the iteration. Otherwise we might end up
+ // associating wrong correlator with the mex.
+ Collection<ProcessInstanceDAO> instanceDaoList = getProcessDAO().findInstance(key,ProcessState.STATE_ACTIVE);
+
+ if (!instanceDaoList.isEmpty()) {
+ ProcessInstanceDAO instance = instanceDaoList.iterator().next();
+ mex.getDAO().setProcess(instance.getProcess());
+ target.noRoutingMatch(mex, routing);
+ routed = true;
+
+ __log.info("noRoutingMatch: Active instance found instanceID: {} correlated with {} and process pid {}",
+ new Object[]{instance.getInstanceId(),key,_pid});
+
+ }
+ }
+ }
+ }
+ }
+ }
return routed;
}
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
index 8ac70c3..aff7733 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
@@ -1517,6 +1517,11 @@ public class BpelRuntimeContextImpl implements BpelRuntimeContext {
MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess, _bpelProcess._engine, mexdao);
+ mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.MATCHED);
+ mex.getDAO().setInstance(_dao);
+ if (mex.getDAO().getCreateTime() == null)
+ mex.getDAO().setCreateTime(getCurrentEventDateTime());
+
inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mex);
execute();
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
index 203a199..42dfd7a 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
@@ -94,8 +94,12 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
}
public List<RoutingInfo> findRoute(MyRoleMessageExchangeImpl mex) {
- List<RoutingInfo> routingInfos = new ArrayList<RoutingInfo>();
-
+ return findRoute(mex, false);
+ }
+
+ public List<RoutingInfo> findRoute(MyRoleMessageExchangeImpl mex, boolean enqueue) {
+ List<RoutingInfo> routingInfos = new ArrayList<RoutingInfo>();
+
if (__log.isTraceEnabled()) {
__log.trace(ObjectPrinter.stringifyMethodEnter(this + ":inputMsgRcvd", new Object[] {
"messageExchange", mex }));
@@ -132,7 +136,7 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
mex.setFailure(MessageExchange.FailureType.FORMAT_ERROR, ime.getMessage(), null);
return null;
}
-
+
String mySessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
String partnerSessionId = mex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
if (__log.isDebugEnabled()) {
@@ -141,19 +145,22 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
+ " partnerSessionId=" + partnerSessionId);
}
- // Try to find a route for one of our keys.
- List<MessageRouteDAO> messageRoutes = correlator.findRoute(keySet);
- if (messageRoutes != null && messageRoutes.size() > 0) {
- for (MessageRouteDAO messageRoute : messageRoutes) {
- if (__log.isDebugEnabled()) {
- __log.debug("INPUTMSG: " + correlatorId + ": ckeySet " + messageRoute.getCorrelationKeySet() + " route is to " + messageRoute);
+ //Avoid searching for message route when enqueue is enabled. It is only when no message route is found, enqueue will be enabled.
+ if(!enqueue){
+ // Try to find a route for one of our keys.
+ List<MessageRouteDAO> messageRoutes = correlator.findRoute(keySet);
+ if (messageRoutes != null && messageRoutes.size() > 0) {
+ for (MessageRouteDAO messageRoute : messageRoutes) {
+ if (__log.isDebugEnabled()) {
+ __log.debug("INPUTMSG: " + correlatorId + ": ckeySet " + messageRoute.getCorrelationKeySet() + " route is to " + messageRoute);
+ }
+ routingInfos.add(new RoutingInfo(messageRoute, messageRoute.getCorrelationKeySet(), correlator, keySet));
}
- routingInfos.add(new RoutingInfo(messageRoute, messageRoute.getCorrelationKeySet(), correlator, keySet));
}
}
-
+
if (routingInfos.size() == 0) {
- routingInfos.add(new RoutingInfo(null, null, correlator, keySet));
+ routingInfos.add(new RoutingInfo(null, null, correlator, keySet));
}
return routingInfos;
@@ -252,31 +259,38 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
instance.execute();
}
+ /**
+ * @deprecated - Now an attempt is made to identify the correct routing in the BpelProcess.invokeProcess() with enqueue enabled.
+ * @param mex
+ * @param routings
+ */
public void noRoutingMatch(MyRoleMessageExchangeImpl mex, List<RoutingInfo> routings) {
+ // enqueue message with the last message route, as per the comments in caller (@see BpelProcess.invokeProcess())
+ RoutingInfo routing = (routings != null && routings.size() > 0) ? routings.get(routings.size() - 1) : null;
+ noRoutingMatch(mex, routing);
+ }
+
+ public void noRoutingMatch(MyRoleMessageExchangeImpl mex, RoutingInfo routing) {
if (!mex.isAsynchronous()) {
mex.setFailure(MessageExchange.FailureType.NOMATCH, "No process instance matching correlation keys.", null);
} else {
- // enqueue message with the last message route, as per the comments in caller (@see BpelProcess.invokeProcess())
- RoutingInfo routing =
- (routings != null && routings.size() > 0) ?
- routings.get(routings.size() - 1) : null;
- if (routing != null) {
+ if (routing != null) {
if (__log.isDebugEnabled()) {
__log.debug("INPUTMSG: " + routing.correlator.getCorrelatorId() + ": SAVING to DB (no match) ");
}
- // send event
- CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
- .getOperation().getName(), mex.getMessageExchangeId(), routing.wholeKeySet);
-
- evt.setProcessId(_process.getProcessDAO().getProcessId());
- evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
- _process._debugger.onEvent(evt);
-
- mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED);
-
- // No match, means we add message exchange to the queue.
- routing.correlator.enqueueMessage(mex.getDAO(), routing.wholeKeySet);
+ // send event
+ CorrelationNoMatchEvent evt = new CorrelationNoMatchEvent(mex.getPortType().getQName(), mex
+ .getOperation().getName(), mex.getMessageExchangeId(), routing.wholeKeySet);
+
+ evt.setProcessId(_process.getProcessDAO().getProcessId());
+ evt.setProcessName(new QName(_process.getOProcess().targetNamespace, _process.getOProcess().getName()));
+ _process._debugger.onEvent(evt);
+
+ mex.setCorrelationStatus(MyRoleMessageExchange.CorrelationStatus.QUEUED);
+
+ // No match, means we add message exchange to the queue.
+ routing.correlator.enqueueMessage(mex.getDAO(), routing.wholeKeySet);
//Second matcher needs to be registered here
JobDetails we = new JobDetails();
@@ -289,7 +303,7 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
}else{
_process._engine._contexts.scheduler.schedulePersistedJob(we, null);
}
- }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
index 3e1ebd7..41d94f2 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
@@ -251,4 +251,10 @@ class ProcessDaoImpl extends DaoBaseImpl implements ProcessDAO {
}
}
}
+
+ @Override
+ public Collection<ProcessInstanceDAO> findInstance(CorrelationKey cckey, short processInstanceState) {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
index c4cbd6c..11aaf6b 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
@@ -50,9 +50,10 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
/** filter for finding a matching selector. */
private static final String LOCK_SELECTORS = "update from HCorrelatorSelector as hs set hs.lock = hs.lock+1 where hs.processType = :processType";
private static final String CHECK_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
- private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
+ //private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlator.correlatorId = :correlatorId";
+ private static final String FLTR_SELECTORS = "from HCorrelatorSelector as hs where hs.correlator = :correlator";
private static final String FLTR_SELECTORS_SUBQUERY = ("from HCorrelatorSelector as hs where hs.processType = :processType and hs.correlatorId = " +
- "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId )").intern();
+ "(select hc.id from HCorrelator as hc where hc.correlatorId = :correlatorId)").intern();
/** Query for removing routes. */
private static final String QRY_DELSELECTORS = "delete from HCorrelatorSelector where groupId = ? and instance = ?";
@@ -115,12 +116,12 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
String hdr = "findRoute(keySet=" + keySet + "): ";
if (__log.isDebugEnabled()) __log.debug(hdr);
- String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString();
+ //String processType = new QName(_hobj.getProcess().getTypeNamespace(), _hobj.getProcess().getTypeName()).toString();
List<CorrelationKeySet> subSets = keySet.findSubSets();
- Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
- q.setString("processType", processType);
- q.setString("correlatorId", _hobj.getCorrelatorId());
+ //Query q = getSession().createQuery(generateSelectorQuery(_sm.canJoinForUpdate() ? FLTR_SELECTORS : FLTR_SELECTORS_SUBQUERY, subSets));
+ Query q = getSession().createQuery(generateSelectorQuery(FLTR_SELECTORS, subSets));
+ q.setEntity("correlator", getHibernateObj());
for( int i = 0; i < subSets.size(); i++ ) {
q.setString("s" + i, subSets.get(i).toCanonicalString());
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
index 278eb73..8d45610 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
@@ -35,6 +35,7 @@ import org.apache.ode.bpel.dao.CorrelatorDAO;
import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.dao.ScopeDAO;
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.apache.ode.daohib.SessionManager;
import org.apache.ode.daohib.bpel.hobj.HActivityRecovery;
@@ -54,6 +55,8 @@ import org.apache.ode.daohib.bpel.hobj.HProcessInstance;
import org.apache.ode.daohib.bpel.hobj.HScope;
import org.apache.ode.daohib.bpel.hobj.HVariableProperty;
import org.apache.ode.daohib.bpel.hobj.HXmlData;
+import org.apache.ode.utils.stl.CollectionsX;
+import org.apache.ode.utils.stl.UnaryFunction;
import org.hibernate.Criteria;
import org.hibernate.Hibernate;
import org.hibernate.Query;
@@ -134,11 +137,18 @@ public class ProcessDaoImpl extends HibernateDao implements ProcessDAO, Deferred
@SuppressWarnings("unchecked")
public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckeyValue) {
entering("ProcessDaoImpl.findInstance");
- Criteria criteria = getSession().createCriteria(HCorrelationSet.class);
- criteria.add(Expression.eq("scope.instance.process.id",_process.getId()));
- criteria.add(Expression.eq("value", ckeyValue.toCanonicalString()));
- criteria.addOrder(Order.desc("scope.instance.created"));
- return criteria.list();
+ Query qry = getSession().getNamedQuery(HCorrelationSet.SELECT_INSTANCES_BY_CORSETS);
+ qry.setParameter("ckey", ckeyValue.toCanonicalString());
+ Collection<HProcessInstance> resultList = qry.list();
+
+ ArrayList<ProcessInstanceDAO> ret = new ArrayList<ProcessInstanceDAO>();
+ CollectionsX.transform(ret, resultList, new UnaryFunction<HProcessInstance,ProcessInstanceDAO> () {
+ public ProcessInstanceDAO apply(HProcessInstance x) {
+ return new ProcessInstanceDaoImpl(_sm, x);
+ }
+ });
+
+ return ret;
}
/**
@@ -296,4 +306,23 @@ public class ProcessDaoImpl extends HibernateDao implements ProcessDAO, Deferred
public String getGuid() {
return _process.getGuid();
}
+
+ @Override
+ public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, short processInstanceState) {
+ entering("ProcessDaoImpl.findInstance");
+ Query qry = getSession().getNamedQuery(HCorrelationSet.SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS);
+ qry.setParameter("ckey", ckey.toCanonicalString());
+ qry.setEntity("process", getHibernateObj());
+ qry.setShort("state", processInstanceState);
+ Collection<HProcessInstance> resultList = qry.list();
+
+ ArrayList<ProcessInstanceDAO> ret = new ArrayList<ProcessInstanceDAO>();
+ CollectionsX.transform(ret, resultList, new UnaryFunction<HProcessInstance,ProcessInstanceDAO> () {
+ public ProcessInstanceDAO apply(HProcessInstance x) {
+ return new ProcessInstanceDaoImpl(_sm, x);
+ }
+ });
+
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
index 1c0dba9..1656989 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
@@ -28,11 +28,16 @@ import java.util.Collection;
* @hibernate.query name="SELECT_CORSET_IDS_BY_INSTANCES" query="select id from HCorrelationSet as c where c.instance in (:instances)"
* @hibernate.query name="SELECT_CORSETS_BY_INSTANCES" query="from HCorrelationSet as c left join fetch c.properties where c.instance.id in (:instances)"
* @hibernate.query name="SELECT_CORSETS_BY_PROCESS_STATES" query="from HCorrelationSet as c left join fetch c.process left join fetch c.instance where c.instance.state in (:states)"
+ * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS" query="select cs.scope.instance from HCorrelationSet as cs where cs.value = :ckey"
+ * @hibernate.query name="SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS" query="select cs.scope.instance from HCorrelationSet as cs where cs.value = :ckey and
+ * cs.process = :process and cs.instance.state = :state"
*/
public class HCorrelationSet extends HObject{
public static final String SELECT_CORSET_IDS_BY_INSTANCES = "SELECT_CORSET_IDS_BY_INSTANCES";
public static final String SELECT_CORSETS_BY_INSTANCES = "SELECT_CORSETS_BY_INSTANCES";
public static final String SELECT_CORSETS_BY_PROCESS_STATES = "SELECT_CORSETS_BY_PROCESS_STATES";
+ public static final String SELECT_INSTANCES_BY_CORSETS = "SELECT_INSTANCES_BY_CORSETS";
+ public static final String SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS = "SELECT_INSTANCES_BY_CORSETS_STATE_PROCESS";
private HProcess _process;
private HProcessInstance _instance;
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
----------------------------------------------------------------------
diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
index 84681b7..9f6fbce 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/CorrelatorDAOImpl.java
@@ -36,7 +36,7 @@ import java.util.List;
public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO {
private static Logger __log = LoggerFactory.getLogger(CorrelatorDAOImpl.class);
public final static String DELETE_CORRELATORS_BY_PROCESS = "DELETE_CORRELATORS_BY_PROCESS";
- private final static String ROUTE_BY_CKEY_HEADER = "select route from MessageRouteDAOImpl as route where route._correlator._process._processType = :ptype and route._correlator._correlatorKey = :corrkey";
+ private final static String ROUTE_BY_CKEY_HEADER = "select route from MessageRouteDAOImpl as route where route._correlator = :corr ";
@Id
@Column(name = "CORRELATOR_ID")
@@ -77,6 +77,11 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO {
MessageExchangeDAOImpl mex = itr.next();
if (mex.getCorrelationKeySet().isRoutableTo(correlationKeySet, false)) {
itr.remove();
+ MessageExchangeDAOImpl mexImpl = (MessageExchangeDAOImpl) mex;
+ mexImpl.setCorrelationKeySet(null);
+ mexImpl.setCorrelator(null);
+ //getEM().remove(mex);
+ //getEM().flush();
return mex;
}
}
@@ -101,8 +106,7 @@ public class CorrelatorDAOImpl extends OpenJPADAO implements CorrelatorDAO {
}
List<CorrelationKeySet> subSets = correlationKeySet.findSubSets();
Query qry = getEM().createQuery(generateSelectorQuery(ROUTE_BY_CKEY_HEADER, subSets));
- qry.setParameter("ptype", _process.getType().toString());
- qry.setParameter("corrkey", _correlatorKey);
+ qry.setParameter("corr", this);
for (int i = 0; i < subSets.size(); i++) {
qry.setParameter("s" + i, subSets.get(i).toCanonicalString());
}
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
----------------------------------------------------------------------
diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
index c0f35ff..e334520 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
@@ -323,7 +323,7 @@ public class MessageExchangeDAOImpl extends OpenJPADAO implements MessageExchang
}
void setCorrelationKeySet(CorrelationKeySet correlationKeySet) {
- _correlationKeys = correlationKeySet.toCanonicalString();
+ _correlationKeys = correlationKeySet != null ? correlationKeySet.toCanonicalString() : null;
}
CorrelationKeySet getCorrelationKeySet() {
http://git-wip-us.apache.org/repos/asf/ode/blob/501f70f2/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
----------------------------------------------------------------------
diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
index 2b22d08..c109fda 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
@@ -44,7 +44,9 @@ import java.util.List;
@NamedQueries({
@NamedQuery(name="ActiveInstances", query="select i from ProcessInstanceDAOImpl as i where i._process = :process and i._state = :state"),
@NamedQuery(name="InstanceByCKey", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey"),
- @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl as c where c._correlatorKey = :ckey and c._process = :process")
+ @NamedQuery(name="CorrelatorByKey", query="select c from CorrelatorDAOImpl as c where c._correlatorKey = :ckey and c._process = :process"),
+ @NamedQuery(name="InstanceByCKeyProcessState", query="select cs._scope._processInstance from CorrelationSetDAOImpl as cs where cs._correlationKey = :ckey and "
+ + "cs._scope._processInstance._process = :process and cs._scope._processInstance._state = :state")
})
public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO {
private static final Logger __log = LoggerFactory.getLogger(ProcessDAOImpl.class);
@@ -226,4 +228,14 @@ public class ProcessDAOImpl extends OpenJPADAO implements ProcessDAO {
qry.setParameter("state", ProcessState.STATE_ACTIVE);
return qry.getResultList();
}
+
+ @Override
+ public Collection<ProcessInstanceDAO> findInstance(CorrelationKey ckey, short processInstanceState) {
+ //need to make this query more efficient
+ Query qry = getEM().createNamedQuery("InstanceByCKeyProcessState");
+ qry.setParameter("ckey", ckey.toCanonicalString());
+ qry.setParameter("process", this);
+ qry.setParameter("state", ProcessState.STATE_ACTIVE);
+ return qry.getResultList();
+ }
}