You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2016/01/06 09:58:07 UTC
[1/2] ode git commit: Second matcher + logging fix for job retries.
Ported from trunk commit
https://github.com/apache/ode/commit/57e6d71bc153464ab71eb982293d138abbaf85b6
Repository: ode
Updated Branches:
refs/heads/ode-1.3.x c0c7b497a -> ba61210b4
Second matcher + logging fix for job retries. Ported from trunk commit
https://github.com/apache/ode/commit/57e6d71bc153464ab71eb982293d138abbaf85b6
Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/566d3d1a
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/566d3d1a
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/566d3d1a
Branch: refs/heads/ode-1.3.x
Commit: 566d3d1ae886a7459cdfb75155af81cb9c82c91d
Parents: c0c7b49
Author: sathwik <sa...@apache.org>
Authored: Wed Jan 6 13:23:03 2016 +0530
Committer: sathwik <sa...@apache.org>
Committed: Wed Jan 6 13:23:03 2016 +0530
----------------------------------------------------------------------
.../java/org/apache/ode/axis2/ODEService.java | 2 +-
.../org/apache/ode/bpel/iapi/Scheduler.java | 9 +-
.../apache/ode/bpel/dao/MessageExchangeDAO.java | 2 +
.../apache/ode/bpel/engine/BpelEngineImpl.java | 21 +++--
.../org/apache/ode/bpel/engine/BpelProcess.java | 18 ++--
.../ode/bpel/engine/PartnerLinkMyRoleImpl.java | 14 +++
.../ode/bpel/memdao/MessageExchangeDAOImpl.java | 3 +
.../ode/daohib/bpel/CorrelatorDaoImpl.java | 30 ++++---
.../ode/daohib/bpel/MessageExchangeDaoImpl.java | 13 ++-
.../daohib/bpel/hobj/HCorrelatorMessage.java | 2 +
.../org/apache/ode/daohib/bpel/MexTest.java | 93 ++++++++++++++++++++
.../ode/dao/jpa/MessageExchangeDAOImpl.java | 4 +
.../java/org/apache/ode/jacob/vpu/JacobVPU.java | 12 ++-
13 files changed, 192 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEService.java b/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
index f3876d4..35c090b 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
@@ -112,7 +112,7 @@ public class ODEService {
String messageId = new GUID().toString();
odeMex = _server.getEngine().createMessageExchange("" + messageId, _serviceName,
msgContext.getAxisOperation().getName().getLocalPart());
- __log.debug("ODE routed to operation " + odeMex.getOperation() + " from service " + _serviceName);
+ __log.debug("ODE routed to portType " + odeMex.getPortType() + " operation " + odeMex.getOperation() + " from service " + _serviceName);
odeMex.setProperty("isTwoWay", Boolean.toString(msgContext.getAxisOperation() instanceof TwoChannelAxisOperation));
if (odeMex.getOperation() != null) {
// Preparing message to send to ODE
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java b/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
index 52a6a31..85079b2 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
@@ -208,7 +208,14 @@ public interface Scheduler {
* out, checks whether a response has arrived and if not, it marks the MEX as
* faulted.
*/
- INVOKE_CHECK
+ INVOKE_CHECK,
+
+ /**
+ * is used to avoid the race condition when a message has been correlated but
+ * no process instance is able to process it and the route has been added
+ * meanwhile. It just retries the correlation.
+ */
+ MEX_MATCHER
}
public static class JobDetails {
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
----------------------------------------------------------------------
diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
index 786c102..5de9bbe 100644
--- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
+++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
@@ -252,4 +252,6 @@ public interface MessageExchangeDAO {
* Deletes messages that arrived before the route is setup
*/
void releasePremieMessages();
+
+ boolean lockPremieMessages();
}
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
index 7fdf924..6a1edce 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
@@ -247,13 +247,16 @@ public class BpelEngineImpl implements BpelEngine {
case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
mex = new MyRoleMessageExchangeImpl(process, this, mexdao);
if (process != null) {
- OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
- // the partner link might not be hydrated
- if (plink != null) {
- PortType ptype = plink.myRolePortType;
- Operation op = plink.getMyRoleOperation(mexdao.getOperation());
- mex.setPortOp(ptype, op);
- }
+ Object child = process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
+ if (child instanceof OPartnerLink) {
+ OPartnerLink plink = (OPartnerLink) child;
+ // the partner link might not be hydrated
+ if (plink != null) {
+ PortType ptype = plink.myRolePortType;
+ Operation op = plink.getMyRoleOperation(mexdao.getOperation());
+ mex.setPortOp(ptype, op);
+ }
+ }
}
break;
default:
@@ -477,7 +480,7 @@ public class BpelEngineImpl implements BpelEngine {
}
}
- if (we.getType().equals(JobType.INVOKE_INTERNAL)) {
+ if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) {
List<BpelProcess> processes = getAllProcesses(we.getProcessId());
boolean routed = false;
jobInfo.jobDetail.detailsExt.put("enqueue", false);
@@ -486,7 +489,7 @@ public class BpelEngineImpl implements BpelEngine {
routed = routed || proc.handleJobDetails(jobInfo.jobDetail);
}
- if(!routed) {
+ if(!routed && we.getType() == JobType.INVOKE_INTERNAL) {
jobInfo.jobDetail.detailsExt.put("enqueue", true);
process.handleJobDetails(jobInfo.jobDetail);
}
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
index cfddef7..afe669a 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
@@ -414,7 +414,7 @@ public class BpelProcess {
* @see org.apache.ode.bpel.engine.BpelProcess#handleJobDetails(java.util.Map<java.lang.String,java.lang.Object>)
*/
public boolean handleJobDetails(JobDetails jobData) {
- boolean ret = true;
+ boolean routed = true;
try {
_hydrationLatch.latch(1);
markused();
@@ -425,12 +425,20 @@ public class BpelProcess {
JobDetails we = jobData;
// Process level events
- if (we.getType().equals(JobType.INVOKE_INTERNAL)) {
+ if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) {
if (__log.isDebugEnabled()) {
- __log.debug("InvokeInternal event for mexid " + we.getMexId());
+ __log.debug(we.getType() + " event for mexid " + we.getMexId());
}
MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) _engine.getMessageExchange(we.getMexId());
- ret = invokeProcess(mex, (Boolean) jobData.detailsExt.get("enqueue"));
+ if (we.getType() == JobType.MEX_MATCHER && !mex.getDAO().lockPremieMessages()) {
+ //Skip if already processed
+ return true;
+ }
+
+ routed = invokeProcess(mex, (Boolean) jobData.detailsExt.get("enqueue"));
+ if (we.getType() == JobType.MEX_MATCHER && routed) {
+ mex.getDAO().releasePremieMessages();
+ }
} else {
// Instance level events
ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getInstanceId());
@@ -477,7 +485,7 @@ public class BpelProcess {
} finally {
_hydrationLatch.release(1);
}
- return ret;
+ return routed;
}
private void setRoles(OProcess oprocess) {
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
index 4537286..203a199 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
@@ -45,6 +45,8 @@ import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessState;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.bpel.iapi.Scheduler.JobType;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.o.OMessageVarType;
import org.apache.ode.bpel.o.OPartnerLink;
@@ -275,6 +277,18 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
// No match, means we add message exchange to the queue.
routing.correlator.enqueueMessage(mex.getDAO(), routing.wholeKeySet);
+
+ //Second matcher needs to be registered here
+ JobDetails we = new JobDetails();
+ we.setType(JobType.MEX_MATCHER);
+ we.setProcessId(_process.getPID());
+ we.setMexId(mex.getMessageExchangeId());
+ we.setInMem(_process.isInMemory());
+ if(_process.isInMemory()){
+ _process._engine._contexts.scheduler.scheduleVolatileJob(true, we);
+ }else{
+ _process._engine._contexts.scheduler.schedulePersistedJob(we, null);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
index 9f71b30..5369694 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
@@ -305,4 +305,7 @@ public class MessageExchangeDAOImpl extends DaoBaseImpl implements MessageExchan
return "mem.mex(direction=" + direction + " id=" + messageExchangeId + ")";
}
+ public boolean lockPremieMessages() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
index 42fc3e6..c4cbd6c 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.common.CorrelationKey;
import org.apache.ode.bpel.common.CorrelationKeySet;
import org.apache.ode.bpel.dao.*;
+import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.daohib.SessionManager;
import org.apache.ode.daohib.bpel.hobj.HCorrelator;
import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage;
@@ -36,6 +37,7 @@ import org.hibernate.Hibernate;
import org.hibernate.LockMode;
import org.hibernate.Query;
import org.hibernate.Session;
+import org.hibernate.exception.LockAcquisitionException;
import javax.xml.namespace.QName;
@@ -82,7 +84,12 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
// We really should consider the possibility of multiple messages matching a criteria.
// When the message is handled, its not too convenient to attempt to determine if the
// received message conflicts with one already received.
- Iterator mcors = qry.iterate();
+ Iterator mcors;
+ try {
+ mcors = qry.setLockMode("this", LockMode.UPGRADE).iterate();
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
try {
if (!mcors.hasNext()) {
if (__log.isDebugEnabled())
@@ -122,7 +129,13 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
q.setLockMode("hs", LockMode.UPGRADE);
List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
- for (HCorrelatorSelector selector : (List<HCorrelatorSelector>)q.list()) {
+ List<HCorrelatorSelector> list;
+ try {
+ list = (List<HCorrelatorSelector>) q.list();
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
+ for (HCorrelatorSelector selector : list) {
if (selector != null) {
boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute());
if ("all".equals(selector.getRoute()) ||
@@ -135,13 +148,6 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes);
- // obtain a lock on the correlator to eliminate potential race condition.
- if(__log.isDebugEnabled()) __log.debug("Obtain record lock on " + _hobj);
- Query correlatorLockQuery = getSession().createQuery("from HCorrelator as hc where id = :id");
- correlatorLockQuery.setLong("id", _hobj.getId());
- correlatorLockQuery.setLockMode("hc", LockMode.UPGRADE);
- correlatorLockQuery.list();
-
return routes;
}
@@ -221,7 +227,11 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
hsel.setCorrelator(_hobj);
hsel.setCreated(new Date());
hsel.setRoute(routePolicy);
- getSession().save(hsel);
+ try {
+ getSession().save(hsel);
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
if (__log.isDebugEnabled())
__log.debug(hdr + "saved " + hsel);
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
index 5b711ce..91fd903 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
@@ -34,6 +34,7 @@ import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.PartnerLinkDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.daohib.SessionManager;
import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage;
import org.apache.ode.daohib.bpel.hobj.HMessage;
@@ -43,6 +44,8 @@ import org.apache.ode.daohib.bpel.hobj.HProcessInstance;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.stl.CollectionsX;
import org.apache.ode.utils.stl.UnaryFunctionEx;
+import org.hibernate.LockMode;
+import org.hibernate.exception.LockAcquisitionException;
import org.w3c.dom.Element;
public class MessageExchangeDaoImpl extends HibernateDao implements
@@ -377,5 +380,13 @@ public class MessageExchangeDaoImpl extends HibernateDao implements
getSession().delete(_hself);
// This deletes endpoint LData, callbackEndpoint LData, request HMessage, response HMessage, HMessageExchangeProperty
}
-
+
+ public boolean lockPremieMessages() {
+ try {
+ return getSession().getNamedQuery(HCorrelatorMessage.SELECT_CORMESSAGE_BY_MEX).setLockMode("m", LockMode.UPGRADE).setParameter("mex", _hself).list().size() > 0;
+ } catch (LockAcquisitionException e) {
+ throw new Scheduler.JobProcessorException(e, true);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
index 92a7747..0014244 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
@@ -23,10 +23,12 @@ package org.apache.ode.daohib.bpel.hobj;
* @hibernate.class table="BPEL_UNMATCHED" lazy="true"
* @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_INSTANCES" query="select id from HCorrelatorMessage as m where m.messageExchange in(select mex from HMessageExchange as mex where mex.instance in (:instances))"
* @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_MEX" query="select id from HCorrelatorMessage as m where m.messageExchange = :mex"
+ * @hibernate.query name="SELECT_CORMESSAGE_BY_MEX" query="from HCorrelatorMessage as m where m.messageExchange = :mex"
*/
public class HCorrelatorMessage extends HObject {
public final static String SELECT_CORMESSAGE_IDS_BY_MEX = "SELECT_CORMESSAGE_IDS_BY_MEX";
public final static String SELECT_CORMESSAGE_IDS_BY_INSTANCES = "SELECT_CORMESSAGE_IDS_BY_INSTANCES";
+ public final static String SELECT_CORMESSAGE_BY_MEX = "SELECT_CORMESSAGE_BY_MEX";
private HMessageExchange _messageExchange;
private HCorrelator _correlator;
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java b/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
new file mode 100644
index 0000000..741a513
--- /dev/null
+++ b/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.daohib.bpel;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.namespace.QName;
+
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.dao.CorrelatorDAO;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.daohib.SessionManager;
+import org.apache.ode.daohib.bpel.hobj.HCorrelator;
+
+/**
+ * Testing BpelDAOConnectionImpl.listInstance. We're just producing a lot of
+ * different filter combinations and test if they execute ok. To really test
+ * that the result is the one expected would take a huge test database (with at
+ * least a process and an instance for every possible combination).
+ */
+public class MexTest extends BaseTestDAO {
+
+ private Map<String, List> filterElmts;
+ private ArrayList<String> order;
+
+ protected void setUp() throws Exception {
+ initTM();
+ }
+
+ protected void tearDown() throws Exception {
+ stopTM();
+ }
+
+ public void test() throws Exception {
+ MessageExchangeDAO mex = daoConn.createMessageExchange('M');
+ mex.lockPremieMessages();
+
+ SessionManager sm = ((BpelDAOConnectionImpl) daoConn)._sm;
+ HCorrelator correlator = new HCorrelator();
+ correlator.setCorrelatorId("abc");
+ sm.getSession().save(correlator);
+ new CorrelatorDaoImpl(sm, correlator).dequeueMessage(new CorrelationKeySet("@2[12~a~b]"));
+ }
+
+ public void testCleanup() throws Exception {
+ SessionManager sm = ((BpelDAOConnectionImpl) daoConn)._sm;
+ ProcessDAO p = daoConn.createProcess(QName.valueOf("abc"), QName.valueOf("abc"), "abc", 1);
+ CorrelatorDAO correlator = p.addCorrelator("abc");
+ ProcessInstanceDAO instance = p.createInstance(correlator);
+
+ MessageExchangeDAO mex = daoConn.createMessageExchange('M');
+ mex.setProperty("abc", "def");
+ mex.setInstance(instance);
+
+ txm.commit();
+ txm.begin();
+ assertEquals(1, sm.getSession().createSQLQuery("select count(*) from BPEL_MEX_PROPS").list().get(0));
+
+ Set<CLEANUP_CATEGORY> cleanupCategories = EnumSet.allOf(CLEANUP_CATEGORY.class);
+ instance.delete(cleanupCategories);
+ txm.commit();
+ txm.begin();
+
+ assertEquals(0, sm.getSession().createSQLQuery("select count(*) from BPEL_MEX_PROPS").list().get(0));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
----------------------------------------------------------------------
diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
index d9e5617..c0f35ff 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
@@ -383,4 +383,8 @@ public class MessageExchangeDAOImpl extends OpenJPADAO implements MessageExchang
public void setCreateTime(Date createTime) {
_createTime = createTime;
}
+
+ public boolean lockPremieMessages() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
----------------------------------------------------------------------
diff --git a/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index 1ea83d6..0d4f622 100644
--- a/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -458,10 +458,14 @@ public final class JacobVPU {
__log.error(msg, iae);
throw new RuntimeException(msg, iae);
} catch (InvocationTargetException e) {
- String msg = __msgs.msgClientMethodException(_method.getName(),
- _methodBody.getClass().getName());
- __log.error(msg, e.getTargetException());
- throw new RuntimeException(e.getTargetException());
+ if (e.getTargetException() instanceof RuntimeException) {
+ throw (RuntimeException) e.getTargetException();
+ } else {
+ String msg = __msgs.msgClientMethodException(_method.getName(),
+ _methodBody.getClass().getName());
+ __log.error(msg, e.getTargetException());
+ throw new RuntimeException(e.getTargetException());
+ }
} finally {
ctime = System.currentTimeMillis() - ctime;
_statistics.totalClientTimeMs += ctime;
[2/2] ode git commit: Avoid Read/Write Deadlocks in Hibernate,
Ported from trunk commit.
https://github.com/apache/ode/commit/19b0786f9ec8928fca956597326004196bf61eaa
Posted by sa...@apache.org.
Avoid Read/Write Deadlocks in Hibernate, Ported from trunk commit.
https://github.com/apache/ode/commit/19b0786f9ec8928fca956597326004196bf61eaa
Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/ba61210b
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/ba61210b
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/ba61210b
Branch: refs/heads/ode-1.3.x
Commit: ba61210b426b4cb12596a46ad368f3a25bff6794
Parents: 566d3d1
Author: sathwik <sa...@apache.org>
Authored: Wed Jan 6 13:39:10 2016 +0530
Committer: sathwik <sa...@apache.org>
Committed: Wed Jan 6 13:39:10 2016 +0530
----------------------------------------------------------------------
.../src/main/java/org/apache/ode/bpel/engine/BpelProcess.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ode/blob/ba61210b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
index afe669a..b5cef27 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
@@ -265,9 +265,9 @@ public class BpelProcess {
// For a one way, once the engine is done, the mex can be safely released.
// Sean: not really, if route is not found, we cannot delete the mex yet
- if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY) && routed && getCleanupCategories(false).contains(CLEANUP_CATEGORY.MESSAGES)) {
- mex.release();
- }
+// if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY) && routed && getCleanupCategories(false).contains(CLEANUP_CATEGORY.MESSAGES)) {
+// mex.release();
+// }
return routed;
}