You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by sa...@apache.org on 2015/08/18 12:25:06 UTC

[1/2] ode git commit: ODE-1033: incremented version number by holding DB exclusive lock

Repository: ode
Updated Branches:
  refs/heads/master fb731a748 -> b6b655d7a


ODE-1033: incremented version number by holding DB exclusive lock


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/97239d0f
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/97239d0f
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/97239d0f

Branch: refs/heads/master
Commit: 97239d0f557978fecd850cdc01e167460f101bec
Parents: fb731a7
Author: sathwik <sa...@apache.org>
Authored: Tue Aug 18 15:47:31 2015 +0530
Committer: sathwik <sa...@apache.org>
Committed: Tue Aug 18 15:47:31 2015 +0530

----------------------------------------------------------------------
 .../org/apache/ode/store/ProcessStoreImpl.java  | 22 +++++++----
 .../ode/store/hib/ConfStoreConnectionHib.java   | 27 +++++++++----
 .../ode/store/jpa/ConfStoreConnectionJpa.java   | 41 +++++++++++++++-----
 .../store/jpa/DbConfStoreConnectionFactory.java |  1 +
 4 files changed, 66 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/97239d0f/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
index cf7d194..fabd531 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
@@ -101,7 +101,7 @@ public class ProcessStoreImpl implements ProcessStore {
      */
     private DataSource _inMemDs;
 
-
+    private static final ThreadLocal<Long> _currentVersion = new ThreadLocal<Long>();
 
     public ProcessStoreImpl() {
         this(null, null, "", new OdeConfigProperties(new Properties(), ""), true);
@@ -195,11 +195,12 @@ public class ProcessStoreImpl implements ProcessStore {
         long version;
         if (autoincrementVersion || du.getStaticVersion() == -1) {
             // Process and DU use a monotonically increased single version number by default.
-            version = exec(new Callable<Long>() {
-                public Long call(ConfStoreConnection conn) {
-                    return conn.getNextVersion();
-                }
-            });
+            try {
+                version = getCurrentVersion();
+            } finally {
+                //we need to reset the current version thread local value.
+                _currentVersion.set(null);
+            }
         } else {
             version = du.getStaticVersion();
         }
@@ -296,7 +297,6 @@ public class ProcessStoreImpl implements ProcessStore {
                             newDao.setProperty(prop.getKey(), DOMUtils.domToString(prop.getValue()));
                         }
                         deployed.add(pc.getProcessId());
-                        conn.setVersion(pc.getVersion());
                     } catch (Throwable e) {
                         String errmsg = "Error persisting deployment record for " + pc.getProcessId()
                                 + "; process will not be available after restart!";
@@ -583,12 +583,18 @@ public class ProcessStoreImpl implements ProcessStore {
     }
 
     public long getCurrentVersion() {
+        if (_currentVersion.get() != null){
+            return _currentVersion.get();
+        }
+
         long version = exec(new Callable<Long>() {
             public Long call(ConfStoreConnection conn) {
                 return conn.getNextVersion();
             }
         });
-        return version;
+
+        _currentVersion.set(version);
+        return _currentVersion.get();
     }
 
     protected void fireEvent(ProcessStoreEvent pse) {

http://git-wip-us.apache.org/repos/asf/ode/blob/97239d0f/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java b/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java
index c8469ae..a7c7099 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java
@@ -25,6 +25,8 @@ import org.apache.ode.store.DeploymentUnitDAO;
 import org.apache.ode.store.ProcessConfDAO;
 import org.hibernate.Criteria;
 import org.hibernate.HibernateException;
+import org.hibernate.LockMode;
+import org.hibernate.Query;
 import org.hibernate.Session;
 
 import javax.xml.namespace.QName;
@@ -77,21 +79,32 @@ public class ConfStoreConnectionHib implements ConfStoreConnection {
     }
 
     public long getNextVersion() {
-        VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl)
-                _session.createQuery("from VersionTrackerDAOImpl v ").uniqueResult();
-        if (vt == null) return 1;
-        else return vt.getVersion() + 1;
+        Query q = _session.createQuery("from VersionTrackerDAOImpl v ");
+        q.setLockMode("v", LockMode.UPGRADE);
+        VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl) q.uniqueResult();
+        if (vt == null) {
+            vt = new VersionTrackerDAOImpl();
+            vt.setVersion(1);
+        }else {
+            vt.setVersion(vt.getVersion() + 1);
+        }
+        _session.save(vt);
+        return vt.getVersion();
     }
 
     public void setVersion(long version) {
         _session.flush();
-        VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl)
-                _session.createQuery("from VersionTrackerDAOImpl v ").uniqueResult();
+
+        Query q = _session.createQuery("from VersionTrackerDAOImpl v ");
+        q.setLockMode("v", LockMode.UPGRADE);
+        VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl) q.uniqueResult();
         if (vt == null) {
             vt = new VersionTrackerDAOImpl();
             vt.setId(1);
+            vt.setVersion(1);
+        } else {
+            vt.setVersion(version);
         }
-        vt.setVersion(version);
         _session.save(vt);
     }
 

http://git-wip-us.apache.org/repos/asf/ode/blob/97239d0f/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java b/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java
index fa2f4a7..72a9df9 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java
@@ -25,6 +25,7 @@ import org.apache.ode.store.ConfStoreConnection;
 import org.apache.ode.store.DeploymentUnitDAO;
 
 import javax.persistence.EntityManager;
+import javax.persistence.Query;
 import java.util.Collection;
 import java.util.Date;
 import java.util.List;
@@ -64,19 +65,39 @@ public class ConfStoreConnectionJpa implements ConfStoreConnection {
     }
 
     public long getNextVersion() {
-        List<VersionTrackerDAOImpl> res = _em.createQuery("select v from VersionTrackerDAOImpl v").getResultList();
-        if (res.size() == 0) return 1;
-        else {
-            VersionTrackerDAOImpl vt = res.get(0);
-            return vt.getVersion() + 1;
-        }
+        VersionTrackerDAOImpl vt = null;
+        Query query = _em.createQuery("select v from VersionTrackerDAOImpl v");
+        query.setHint("openjpa.FetchPlan.ReadLockMode", "WRITE");
+
+        List<VersionTrackerDAOImpl> res = query.getResultList();
+
+        if(!res.isEmpty())
+            vt = res.get(0);
+
+        if (vt == null) {
+            vt = new VersionTrackerDAOImpl();
+            vt.setVersion(1);
+        } else {
+            vt.setVersion(vt.getVersion() + 1);
+         }
+
+        _em.persist(vt);
+        return vt.getVersion();
     }
 
     public void setVersion(long version) {
-        List<VersionTrackerDAOImpl> res = _em.createQuery("select v from VersionTrackerDAOImpl v").getResultList();
-        VersionTrackerDAOImpl vt;
-        if (res.size() == 0) vt = new VersionTrackerDAOImpl();
-        else vt = res.get(0);
+        VersionTrackerDAOImpl vt = null;
+        Query query = _em.createQuery("select v from VersionTrackerDAOImpl v");
+        query.setHint("openjpa.FetchPlan.ReadLockMode", "WRITE");
+
+        List<VersionTrackerDAOImpl> res = query.getResultList();
+
+        if(!res.isEmpty())
+            vt = res.get(0);
+
+        if (vt == null)
+            vt = new VersionTrackerDAOImpl();
+
         vt.setVersion(version);
         _em.persist(vt);
     }

http://git-wip-us.apache.org/repos/asf/ode/blob/97239d0f/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java b/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java
index bd8508f..407e18a 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java
@@ -60,6 +60,7 @@ public class DbConfStoreConnectionFactory implements ConfStoreConnectionFactory
         propMap.put("openjpa.ConnectionFactoryMode", "managed");
         propMap.put("openjpa.FlushBeforeQueries", "false");
         propMap.put("openjpa.FetchBatchSize", 1000);
+        propMap.put("openjpa.LockManager","pessimistic");
 
         //dirty hack for ODE-1015
         String skipIsolation = System.getProperty("openjpa.connection.isolation.skip", "N");


[2/2] ode git commit: ODE-1026: Acquire readLock while firing events to avoid deadlock

Posted by sa...@apache.org.
ODE-1026: Acquire readLock while firing events to avoid deadlock


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/b6b655d7
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/b6b655d7
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/b6b655d7

Branch: refs/heads/master
Commit: b6b655d7a18ce305c5260c92f6d3c0010a613b88
Parents: 97239d0
Author: sathwik <sa...@apache.org>
Authored: Tue Aug 18 15:50:24 2015 +0530
Committer: sathwik <sa...@apache.org>
Committed: Tue Aug 18 15:50:24 2015 +0530

----------------------------------------------------------------------
 .../main/java/org/apache/ode/store/ProcessStoreImpl.java | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/b6b655d7/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
----------------------------------------------------------------------
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
index fabd531..ba911a7 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
@@ -308,7 +308,9 @@ public class ProcessStoreImpl implements ProcessStore {
 
         });
 
-        // We want the events to be fired outside of the bounds of the writelock.
+
+        _rw.readLock().lock();
+        boolean readLockHeld = true;
         try {
             for (ProcessConfImpl process : processes) {
                 fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.DEPLOYED, process.getProcessId(), process.getDeploymentUnit()
@@ -316,11 +318,18 @@ public class ProcessStoreImpl implements ProcessStore {
                 fireStateChange(process.getProcessId(), process.getState(), process.getDeploymentUnit().getName());
             }
         } catch (Exception e) {
+            //need to unlock as undeploy operation will need a writeLock
+            _rw.readLock().unlock();
+            readLockHeld = false;
+
             // A problem at that point means that engine deployment failed, we don't want the store to keep the du
             __log.warn("Deployment failed within the engine, store undeploying process.", e);
             undeploy(deploymentUnitDirectory);
             if (e instanceof ContextException) throw (ContextException) e;
             else throw new ContextException("Deployment failed within the engine. " + e.getMessage(), e);
+        } finally {
+            if(readLockHeld)
+                _rw.readLock().unlock();
         }
 
         return deployed;