You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2016/01/06 09:58:07 UTC

[1/2] ode git commit: Second matcher + logging fix for job retries. Ported from trunk commit https://github.com/apache/ode/commit/57e6d71bc153464ab71eb982293d138abbaf85b6

Repository: ode
Updated Branches:
  refs/heads/ode-1.3.x c0c7b497a -> ba61210b4


Second matcher + logging fix for job retries. Ported from trunk commit
https://github.com/apache/ode/commit/57e6d71bc153464ab71eb982293d138abbaf85b6


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/566d3d1a
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/566d3d1a
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/566d3d1a

Branch: refs/heads/ode-1.3.x
Commit: 566d3d1ae886a7459cdfb75155af81cb9c82c91d
Parents: c0c7b49
Author: sathwik <sa...@apache.org>
Authored: Wed Jan 6 13:23:03 2016 +0530
Committer: sathwik <sa...@apache.org>
Committed: Wed Jan 6 13:23:03 2016 +0530

----------------------------------------------------------------------
 .../java/org/apache/ode/axis2/ODEService.java   |  2 +-
 .../org/apache/ode/bpel/iapi/Scheduler.java     |  9 +-
 .../apache/ode/bpel/dao/MessageExchangeDAO.java |  2 +
 .../apache/ode/bpel/engine/BpelEngineImpl.java  | 21 +++--
 .../org/apache/ode/bpel/engine/BpelProcess.java | 18 ++--
 .../ode/bpel/engine/PartnerLinkMyRoleImpl.java  | 14 +++
 .../ode/bpel/memdao/MessageExchangeDAOImpl.java |  3 +
 .../ode/daohib/bpel/CorrelatorDaoImpl.java      | 30 ++++---
 .../ode/daohib/bpel/MessageExchangeDaoImpl.java | 13 ++-
 .../daohib/bpel/hobj/HCorrelatorMessage.java    |  2 +
 .../org/apache/ode/daohib/bpel/MexTest.java     | 93 ++++++++++++++++++++
 .../ode/dao/jpa/MessageExchangeDAOImpl.java     |  4 +
 .../java/org/apache/ode/jacob/vpu/JacobVPU.java | 12 ++-
 13 files changed, 192 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
----------------------------------------------------------------------
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEService.java b/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
index f3876d4..35c090b 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEService.java
@@ -112,7 +112,7 @@ public class ODEService {
             String messageId = new GUID().toString();
             odeMex = _server.getEngine().createMessageExchange("" + messageId, _serviceName,
                     msgContext.getAxisOperation().getName().getLocalPart());
-            __log.debug("ODE routed to operation " + odeMex.getOperation() + " from service " + _serviceName);
+            __log.debug("ODE routed to portType " + odeMex.getPortType() + " operation " + odeMex.getOperation() + " from service " + _serviceName);
             odeMex.setProperty("isTwoWay", Boolean.toString(msgContext.getAxisOperation() instanceof TwoChannelAxisOperation));
             if (odeMex.getOperation() != null) {
                 // Preparing message to send to ODE

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
----------------------------------------------------------------------
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java b/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
index 52a6a31..85079b2 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
@@ -208,7 +208,14 @@ public interface Scheduler {
          * out, checks whether a response has arrived and if not, it marks the MEX as
          * faulted.
          */
-        INVOKE_CHECK
+        INVOKE_CHECK,
+
+        /**
+         * is used to avoid the race condition when a message has been correlated but
+         * no process instance is able to process it and the route has been added
+         * meanwhile. It just retries the correlation.
+         */
+        MEX_MATCHER
     }
     
     public static class JobDetails {

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
----------------------------------------------------------------------
diff --git a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
index 786c102..5de9bbe 100644
--- a/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
+++ b/bpel-dao/src/main/java/org/apache/ode/bpel/dao/MessageExchangeDAO.java
@@ -252,4 +252,6 @@ public interface MessageExchangeDAO {
      * Deletes messages that arrived before the route is setup
      */
     void releasePremieMessages();
+
+    boolean lockPremieMessages();
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
index 7fdf924..6a1edce 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
@@ -247,13 +247,16 @@ public class BpelEngineImpl implements BpelEngine {
         case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
             mex = new MyRoleMessageExchangeImpl(process, this, mexdao);
             if (process != null) {
-                OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
-                // the partner link might not be hydrated
-                if (plink != null) {
-                    PortType ptype = plink.myRolePortType;
-                    Operation op = plink.getMyRoleOperation(mexdao.getOperation());
-                    mex.setPortOp(ptype, op);
-                }
+				Object child = process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
+				if (child instanceof OPartnerLink) {
+				    OPartnerLink plink = (OPartnerLink) child;
+				    // the partner link might not be hydrated
+					if (plink != null) {
+					    PortType ptype = plink.myRolePortType;
+					    Operation op = plink.getMyRoleOperation(mexdao.getOperation());
+					    mex.setPortOp(ptype, op);
+					}
+				}
             }
             break;
         default:
@@ -477,7 +480,7 @@ public class BpelEngineImpl implements BpelEngine {
                     }
                 }
 
-                if (we.getType().equals(JobType.INVOKE_INTERNAL)) {
+                if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) {
                     List<BpelProcess> processes = getAllProcesses(we.getProcessId());
                     boolean routed = false;
                     jobInfo.jobDetail.detailsExt.put("enqueue", false);
@@ -486,7 +489,7 @@ public class BpelEngineImpl implements BpelEngine {
                         routed = routed || proc.handleJobDetails(jobInfo.jobDetail);
                     }
 
-                    if(!routed) {
+                    if(!routed && we.getType() == JobType.INVOKE_INTERNAL) {
                         jobInfo.jobDetail.detailsExt.put("enqueue", true);
                         process.handleJobDetails(jobInfo.jobDetail);
                     }

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
index cfddef7..afe669a 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
@@ -414,7 +414,7 @@ public class BpelProcess {
      * @see org.apache.ode.bpel.engine.BpelProcess#handleJobDetails(java.util.Map<java.lang.String,java.lang.Object>)
      */
     public boolean handleJobDetails(JobDetails jobData) {
-        boolean ret = true;
+        boolean routed = true;
         try {
             _hydrationLatch.latch(1);
             markused();
@@ -425,12 +425,20 @@ public class BpelProcess {
             JobDetails we = jobData;
 
             // Process level events
-            if (we.getType().equals(JobType.INVOKE_INTERNAL)) {
+            if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) {
                 if (__log.isDebugEnabled()) {
-                    __log.debug("InvokeInternal event for mexid " + we.getMexId());
+                    __log.debug(we.getType() + " event for mexid " + we.getMexId());
                 }
                 MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) _engine.getMessageExchange(we.getMexId());
-                ret = invokeProcess(mex, (Boolean) jobData.detailsExt.get("enqueue"));
+                if (we.getType() == JobType.MEX_MATCHER && !mex.getDAO().lockPremieMessages()) {
+                    //Skip if already processed
+                    return true;
+                }
+
+                routed = invokeProcess(mex, (Boolean) jobData.detailsExt.get("enqueue"));
+                if (we.getType() == JobType.MEX_MATCHER && routed) {
+                    mex.getDAO().releasePremieMessages();
+                }
             } else {
                 // Instance level events
                 ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getInstanceId());
@@ -477,7 +485,7 @@ public class BpelProcess {
         } finally {
             _hydrationLatch.release(1);
         }
-        return ret;
+        return routed;
     }
 
     private void setRoles(OProcess oprocess) {

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
index 4537286..203a199 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerLinkMyRoleImpl.java
@@ -45,6 +45,8 @@ import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.MessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.ProcessState;
+import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
+import org.apache.ode.bpel.iapi.Scheduler.JobType;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.o.OMessageVarType;
 import org.apache.ode.bpel.o.OPartnerLink;
@@ -275,6 +277,18 @@ public class PartnerLinkMyRoleImpl extends PartnerLinkRoleImpl {
 	
 	            // No match, means we add message exchange to the queue.
 	            routing.correlator.enqueueMessage(mex.getDAO(), routing.wholeKeySet);
+
+                //Second matcher needs to be registered here
+                JobDetails we = new JobDetails();
+                we.setType(JobType.MEX_MATCHER);
+                we.setProcessId(_process.getPID());
+                we.setMexId(mex.getMessageExchangeId());
+                we.setInMem(_process.isInMemory());
+                if(_process.isInMemory()){
+                    _process._engine._contexts.scheduler.scheduleVolatileJob(true, we);
+                }else{
+                    _process._engine._contexts.scheduler.schedulePersistedJob(we, null);
+                }
         	}
         }
     }

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
index 9f71b30..5369694 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
@@ -305,4 +305,7 @@ public class MessageExchangeDAOImpl extends DaoBaseImpl implements MessageExchan
         return "mem.mex(direction=" + direction + " id=" + messageExchangeId + ")";
     }
 
+	public boolean lockPremieMessages() {
+			return true;
+	}
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
index 42fc3e6..c4cbd6c 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/CorrelatorDaoImpl.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.ode.bpel.common.CorrelationKey;
 import org.apache.ode.bpel.common.CorrelationKeySet;
 import org.apache.ode.bpel.dao.*;
+import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.daohib.SessionManager;
 import org.apache.ode.daohib.bpel.hobj.HCorrelator;
 import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage;
@@ -36,6 +37,7 @@ import org.hibernate.Hibernate;
 import org.hibernate.LockMode;
 import org.hibernate.Query;
 import org.hibernate.Session;
+import org.hibernate.exception.LockAcquisitionException;
 
 import javax.xml.namespace.QName;
 
@@ -82,7 +84,12 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
         // We really should consider the possibility of multiple messages matching a criteria.
         // When the message is handled, its not too convenient to attempt to determine if the
         // received message conflicts with one already received.
-        Iterator mcors = qry.iterate();
+        Iterator mcors;
+        try {
+            mcors = qry.setLockMode("this", LockMode.UPGRADE).iterate();
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
         try {
             if (!mcors.hasNext()) {
                 if (__log.isDebugEnabled())
@@ -122,7 +129,13 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
         q.setLockMode("hs", LockMode.UPGRADE);
 
         List<HProcessInstance> targets = new ArrayList<HProcessInstance>();
-        for (HCorrelatorSelector selector : (List<HCorrelatorSelector>)q.list()) {
+        List<HCorrelatorSelector> list;
+        try {
+            list = (List<HCorrelatorSelector>) q.list();
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
+        for (HCorrelatorSelector selector : list) {
             if (selector != null) {
                 boolean isRoutePolicyOne = selector.getRoute() == null || "one".equals(selector.getRoute());
                 if ("all".equals(selector.getRoute()) ||
@@ -135,13 +148,6 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
 
         if(__log.isDebugEnabled()) __log.debug(hdr + "found " + routes);
 
-        // obtain a lock on the correlator to eliminate potential race condition.
-        if(__log.isDebugEnabled()) __log.debug("Obtain record lock on " + _hobj);
-        Query correlatorLockQuery = getSession().createQuery("from HCorrelator as hc where id = :id");
-        correlatorLockQuery.setLong("id", _hobj.getId());
-        correlatorLockQuery.setLockMode("hc", LockMode.UPGRADE);
-        correlatorLockQuery.list();
-
         return routes;
     }
 
@@ -221,7 +227,11 @@ class CorrelatorDaoImpl extends HibernateDao implements CorrelatorDAO {
         hsel.setCorrelator(_hobj);
         hsel.setCreated(new Date());
         hsel.setRoute(routePolicy);
-        getSession().save(hsel);
+        try {
+            getSession().save(hsel);
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
 
         if (__log.isDebugEnabled())
             __log.debug(hdr + "saved " + hsel);

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
index 5b711ce..91fd903 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
@@ -34,6 +34,7 @@ import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.PartnerLinkDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.daohib.SessionManager;
 import org.apache.ode.daohib.bpel.hobj.HCorrelatorMessage;
 import org.apache.ode.daohib.bpel.hobj.HMessage;
@@ -43,6 +44,8 @@ import org.apache.ode.daohib.bpel.hobj.HProcessInstance;
 import org.apache.ode.utils.DOMUtils;
 import org.apache.ode.utils.stl.CollectionsX;
 import org.apache.ode.utils.stl.UnaryFunctionEx;
+import org.hibernate.LockMode;
+import org.hibernate.exception.LockAcquisitionException;
 import org.w3c.dom.Element;
 
 public class MessageExchangeDaoImpl extends HibernateDao implements
@@ -377,5 +380,13 @@ public class MessageExchangeDaoImpl extends HibernateDao implements
         getSession().delete(_hself);
         // This deletes endpoint LData, callbackEndpoint LData, request HMessage, response HMessage, HMessageExchangeProperty 
     }
-    
+
+    public boolean lockPremieMessages() {
+        try {
+            return getSession().getNamedQuery(HCorrelatorMessage.SELECT_CORMESSAGE_BY_MEX).setLockMode("m", LockMode.UPGRADE).setParameter("mex", _hself).list().size() > 0;
+        } catch (LockAcquisitionException e) {
+            throw new Scheduler.JobProcessorException(e, true);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
index 92a7747..0014244 100644
--- a/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
+++ b/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
@@ -23,10 +23,12 @@ package org.apache.ode.daohib.bpel.hobj;
  * @hibernate.class table="BPEL_UNMATCHED" lazy="true"
  * @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_INSTANCES" query="select id from HCorrelatorMessage as m where m.messageExchange in(select mex from HMessageExchange as mex where mex.instance in (:instances))"
  * @hibernate.query name="SELECT_CORMESSAGE_IDS_BY_MEX" query="select id from HCorrelatorMessage as m where m.messageExchange = :mex"
+ * @hibernate.query name="SELECT_CORMESSAGE_BY_MEX" query="from HCorrelatorMessage as m where m.messageExchange = :mex"
  */
 public class HCorrelatorMessage extends HObject {
     public final static String SELECT_CORMESSAGE_IDS_BY_MEX = "SELECT_CORMESSAGE_IDS_BY_MEX";
     public final static String SELECT_CORMESSAGE_IDS_BY_INSTANCES = "SELECT_CORMESSAGE_IDS_BY_INSTANCES";
+    public final static String SELECT_CORMESSAGE_BY_MEX = "SELECT_CORMESSAGE_BY_MEX";
 
     private HMessageExchange _messageExchange;
     private HCorrelator _correlator;

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
----------------------------------------------------------------------
diff --git a/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java b/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
new file mode 100644
index 0000000..741a513
--- /dev/null
+++ b/dao-hibernate/src/test/java/org/apache/ode/daohib/bpel/MexTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.daohib.bpel;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.namespace.QName;
+
+import org.apache.ode.bpel.common.CorrelationKeySet;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.dao.CorrelatorDAO;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.daohib.SessionManager;
+import org.apache.ode.daohib.bpel.hobj.HCorrelator;
+
+/**
+ * Testing BpelDAOConnectionImpl.listInstance. We're just producing a lot of
+ * different filter combinations and test if they execute ok. To really test
+ * that the result is the one expected would take a huge test database (with at
+ * least a process and an instance for every possible combination).
+ */
+public class MexTest extends BaseTestDAO {
+
+    private Map<String, List> filterElmts;
+    private ArrayList<String> order;
+
+    protected void setUp() throws Exception {
+        initTM();
+    }
+
+    protected void tearDown() throws Exception {
+        stopTM();
+    }
+
+    public void test() throws Exception {
+        MessageExchangeDAO mex = daoConn.createMessageExchange('M');
+        mex.lockPremieMessages();
+
+        SessionManager sm = ((BpelDAOConnectionImpl) daoConn)._sm;
+        HCorrelator correlator = new HCorrelator();
+        correlator.setCorrelatorId("abc");
+        sm.getSession().save(correlator);
+        new CorrelatorDaoImpl(sm, correlator).dequeueMessage(new CorrelationKeySet("@2[12~a~b]"));
+    }
+
+    public void testCleanup() throws Exception {
+        SessionManager sm = ((BpelDAOConnectionImpl) daoConn)._sm;
+        ProcessDAO p = daoConn.createProcess(QName.valueOf("abc"), QName.valueOf("abc"), "abc", 1);
+        CorrelatorDAO correlator = p.addCorrelator("abc");
+        ProcessInstanceDAO instance = p.createInstance(correlator);
+
+        MessageExchangeDAO mex = daoConn.createMessageExchange('M');
+        mex.setProperty("abc", "def");
+        mex.setInstance(instance);
+
+        txm.commit();
+        txm.begin();
+        assertEquals(1, sm.getSession().createSQLQuery("select count(*) from BPEL_MEX_PROPS").list().get(0));
+        
+        Set<CLEANUP_CATEGORY> cleanupCategories = EnumSet.allOf(CLEANUP_CATEGORY.class);
+        instance.delete(cleanupCategories);
+        txm.commit();
+        txm.begin();
+
+        assertEquals(0, sm.getSession().createSQLQuery("select count(*) from BPEL_MEX_PROPS").list().get(0));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
----------------------------------------------------------------------
diff --git a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
index d9e5617..c0f35ff 100644
--- a/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
+++ b/dao-jpa/src/main/java/org/apache/ode/dao/jpa/MessageExchangeDAOImpl.java
@@ -383,4 +383,8 @@ public class MessageExchangeDAOImpl extends OpenJPADAO implements MessageExchang
     public void setCreateTime(Date createTime) {
         _createTime = createTime;
     }
+
+    public boolean lockPremieMessages() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ode/blob/566d3d1a/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
----------------------------------------------------------------------
diff --git a/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index 1ea83d6..0d4f622 100644
--- a/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -458,10 +458,14 @@ public final class JacobVPU {
                 __log.error(msg, iae);
                 throw new RuntimeException(msg, iae);
             } catch (InvocationTargetException e) {
-                String msg = __msgs.msgClientMethodException(_method.getName(),
-                        _methodBody.getClass().getName());
-                __log.error(msg, e.getTargetException());
-                throw new RuntimeException(e.getTargetException());
+                if (e.getTargetException() instanceof RuntimeException) {
+                    throw (RuntimeException) e.getTargetException();
+                } else {
+                    String msg = __msgs.msgClientMethodException(_method.getName(),
+                            _methodBody.getClass().getName());
+                    __log.error(msg, e.getTargetException());
+                    throw new RuntimeException(e.getTargetException());
+                }
             } finally {
                 ctime = System.currentTimeMillis() - ctime;
                 _statistics.totalClientTimeMs += ctime;


[2/2] ode git commit: Avoid Read/Write Deadlocks in Hibernate, Ported from trunk commit. https://github.com/apache/ode/commit/19b0786f9ec8928fca956597326004196bf61eaa

Posted by sa...@apache.org.
Avoid Read/Write Deadlocks in Hibernate, Ported from trunk commit.
https://github.com/apache/ode/commit/19b0786f9ec8928fca956597326004196bf61eaa


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/ba61210b
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/ba61210b
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/ba61210b

Branch: refs/heads/ode-1.3.x
Commit: ba61210b426b4cb12596a46ad368f3a25bff6794
Parents: 566d3d1
Author: sathwik <sa...@apache.org>
Authored: Wed Jan 6 13:39:10 2016 +0530
Committer: sathwik <sa...@apache.org>
Committed: Wed Jan 6 13:39:10 2016 +0530

----------------------------------------------------------------------
 .../src/main/java/org/apache/ode/bpel/engine/BpelProcess.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/ba61210b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
----------------------------------------------------------------------
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
index afe669a..b5cef27 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
@@ -265,9 +265,9 @@ public class BpelProcess {
 
         // For a one way, once the engine is done, the mex can be safely released.
         // Sean: not really, if route is not found, we cannot delete the mex yet
-        if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY) && routed && getCleanupCategories(false).contains(CLEANUP_CATEGORY.MESSAGES)) {
-            mex.release();
-        }
+//        if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY) && routed && getCleanupCategories(false).contains(CLEANUP_CATEGORY.MESSAGES)) {
+//            mex.release();
+//        }
 
         return routed;
     }