You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/05/16 17:24:54 UTC
hive git commit: HIVE-13753 : Make metastore client thread safe in
DbTxnManager (Wei Zheng, reviewed by Vaibhav Gumashta)
Repository: hive
Updated Branches:
refs/heads/master 6cb5dbe64 -> bb1ee8167
HIVE-13753 : Make metastore client thread safe in DbTxnManager (Wei Zheng, reviewed by Vaibhav Gumashta)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bb1ee816
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bb1ee816
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bb1ee816
Branch: refs/heads/master
Commit: bb1ee8167006fb8ae7868502d95ebc31f6ea3dd5
Parents: 6cb5dbe
Author: Wei Zheng <we...@apache.org>
Authored: Mon May 16 10:24:39 2016 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Mon May 16 10:24:39 2016 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/lockmgr/DbLockManager.java | 4 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 50 +++++++++++++++++++-
2 files changed, 50 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bb1ee816/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 2804514..b4ae1d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -54,11 +54,11 @@ public class DbLockManager implements HiveLockManager{
private long MAX_SLEEP;
//longer term we should always have a txn id and then we won't need to track locks here at all
private Set<DbHiveLock> locks;
- private IMetaStoreClient client;
+ private DbTxnManager.SynchronizedMetaStoreClient client;
private long nextSleep = 50;
private final HiveConf conf;
- DbLockManager(IMetaStoreClient client, HiveConf conf) {
+ DbLockManager(DbTxnManager.SynchronizedMetaStoreClient client, HiveConf conf) {
locks = new HashSet<>();
this.client = client;
this.conf = conf;
http://git-wip-us.apache.org/repos/asf/hive/blob/bb1ee816/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 4539e71..9ab6169 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -62,7 +62,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
private DbLockManager lockMgr = null;
- private IMetaStoreClient client = null;
+ private SynchronizedMetaStoreClient client = null;
/**
* The Metastore NEXT_TXN_ID.NTXN_NEXT is initialized to 1; it contains the next available
* transaction id. Thus is 1 is first transaction id.
@@ -520,7 +520,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
try {
Hive db = Hive.get(conf);
- client = db.getMSC();
+ client = new SynchronizedMetaStoreClient(db.getMSC());
initHeartbeatExecutorService();
} catch (MetaException e) {
throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
@@ -615,4 +615,50 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
}
}
+
+ /**
+ * Synchronized MetaStoreClient wrapper
+ */
+ final class SynchronizedMetaStoreClient {
+ private final IMetaStoreClient client;
+ SynchronizedMetaStoreClient(IMetaStoreClient client) {
+ this.client = client;
+ }
+
+ synchronized long openTxn(String user) throws TException {
+ return client.openTxn(user);
+ }
+
+ synchronized void commitTxn(long txnid) throws TException {
+ client.commitTxn(txnid);
+ }
+
+ synchronized void rollbackTxn(long txnid) throws TException {
+ client.rollbackTxn(txnid);
+ }
+
+ synchronized void heartbeat(long txnid, long lockid) throws TException {
+ client.heartbeat(txnid, lockid);
+ }
+
+ synchronized ValidTxnList getValidTxns(long currentTxn) throws TException {
+ return client.getValidTxns(currentTxn);
+ }
+
+ synchronized LockResponse lock(LockRequest request) throws TException {
+ return client.lock(request);
+ }
+
+ synchronized LockResponse checkLock(long lockid) throws TException {
+ return client.checkLock(lockid);
+ }
+
+ synchronized void unlock(long lockid) throws TException {
+ client.unlock(lockid);
+ }
+
+ synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException {
+ return client.showLocks(showLocksRequest);
+ }
+ }
}