You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/05/16 17:24:54 UTC

hive git commit: HIVE-13753 : Make metastore client thread safe in DbTxnManager (Wei Zheng, reviewed by Vaibhav Gumashta)

Repository: hive
Updated Branches:
  refs/heads/master 6cb5dbe64 -> bb1ee8167


HIVE-13753 : Make metastore client thread safe in DbTxnManager (Wei Zheng, reviewed by Vaibhav Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bb1ee816
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bb1ee816
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bb1ee816

Branch: refs/heads/master
Commit: bb1ee8167006fb8ae7868502d95ebc31f6ea3dd5
Parents: 6cb5dbe
Author: Wei Zheng <we...@apache.org>
Authored: Mon May 16 10:24:39 2016 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Mon May 16 10:24:39 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/lockmgr/DbLockManager.java   |  4 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    | 50 +++++++++++++++++++-
 2 files changed, 50 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bb1ee816/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 2804514..b4ae1d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -54,11 +54,11 @@ public class DbLockManager implements HiveLockManager{
   private long MAX_SLEEP;
   //longer term we should always have a txn id and then we won't need to track locks here at all
   private Set<DbHiveLock> locks;
-  private IMetaStoreClient client;
+  private DbTxnManager.SynchronizedMetaStoreClient client;
   private long nextSleep = 50;
   private final HiveConf conf;
 
-  DbLockManager(IMetaStoreClient client, HiveConf conf) {
+  DbLockManager(DbTxnManager.SynchronizedMetaStoreClient client, HiveConf conf) {
     locks = new HashSet<>();
     this.client = client;
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/hive/blob/bb1ee816/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 4539e71..9ab6169 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -62,7 +62,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
   private DbLockManager lockMgr = null;
-  private IMetaStoreClient client = null;
+  private SynchronizedMetaStoreClient client = null;
   /**
    * The Metastore NEXT_TXN_ID.NTXN_NEXT is initialized to 1; it contains the next available
    * transaction id.  Thus is 1 is first transaction id.
@@ -520,7 +520,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       }
       try {
         Hive db = Hive.get(conf);
-        client = db.getMSC();
+        client = new SynchronizedMetaStoreClient(db.getMSC());
         initHeartbeatExecutorService();
       } catch (MetaException e) {
         throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
@@ -615,4 +615,50 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       }
     }
   }
+
+  /**
+   * Synchronized MetaStoreClient wrapper
+   */
+  final class SynchronizedMetaStoreClient {
+    private final IMetaStoreClient client;
+    SynchronizedMetaStoreClient(IMetaStoreClient client) {
+      this.client = client;
+    }
+
+    synchronized long openTxn(String user) throws TException {
+      return client.openTxn(user);
+    }
+
+    synchronized void commitTxn(long txnid) throws TException {
+      client.commitTxn(txnid);
+    }
+
+    synchronized void rollbackTxn(long txnid) throws TException {
+      client.rollbackTxn(txnid);
+    }
+
+    synchronized void heartbeat(long txnid, long lockid) throws TException {
+      client.heartbeat(txnid, lockid);
+    }
+
+    synchronized ValidTxnList getValidTxns(long currentTxn) throws TException {
+      return client.getValidTxns(currentTxn);
+    }
+
+    synchronized LockResponse lock(LockRequest request) throws TException {
+      return client.lock(request);
+    }
+
+    synchronized LockResponse checkLock(long lockid) throws TException {
+      return client.checkLock(lockid);
+    }
+
+    synchronized void unlock(long lockid) throws TException {
+      client.unlock(lockid);
+    }
+
+    synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException {
+      return client.showLocks(showLocksRequest);
+    }
+  }
 }