You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/12/22 19:06:48 UTC

hive git commit: HIVE-15376 : Improve heartbeater scheduling for transactions (Wei Zheng, reviewed by Eugene Koifman)

Repository: hive
Updated Branches:
  refs/heads/master 71f250db3 -> ee35ccb19


HIVE-15376 : Improve heartbeater scheduling for transactions (Wei Zheng, reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ee35ccb1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ee35ccb1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ee35ccb1

Branch: refs/heads/master
Commit: ee35ccb19f04f4437c835a7ac960e6b9762d5164
Parents: 71f250d
Author: Wei Zheng <we...@apache.org>
Authored: Thu Dec 22 11:06:36 2016 -0800
Committer: Wei Zheng <we...@apache.org>
Committed: Thu Dec 22 11:06:36 2016 -0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  2 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    | 12 ++-
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |  2 +-
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |  5 +-
 .../hive/ql/lockmgr/TestDbTxnManager.java       | 55 +++++++++-----
 .../hive/ql/lockmgr/TestDbTxnManager2.java      | 80 ++++++++++----------
 .../clientpositive/dbtxnmgr_showlocks.q.out     | 16 ++--
 7 files changed, 99 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 1bc0956..fd6020b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1102,7 +1102,7 @@ public class Driver implements CommandProcessor {
           throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId());
         }
         // We are writing to tables in an ACID compliant way, so we need to open a transaction
-        txnMgr.openTxn(userFromUGI);
+        txnMgr.openTxn(ctx, userFromUGI);
         initiatingTransaction = true;
       }
       else {

http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index a79c106..28ca77c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -135,7 +135,12 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
-  public long openTxn(String user) throws LockException {
+  public long openTxn(Context ctx, String user) throws LockException {
+    return openTxn(ctx, user, 0);
+  }
+
+  @VisibleForTesting
+  long openTxn(Context ctx, String user, long delay) throws LockException {
     //todo: why don't we lock the snapshot here???  Instead of having client make an explicit call
     //whenever it chooses
     init();
@@ -146,6 +151,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       txnId = client.openTxn(user);
       statementId = 0;
       LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
+      ctx.setHeartbeater(startHeartbeat(delay));
       return txnId;
     } catch (TException e) {
       throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED);
@@ -353,7 +359,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   @VisibleForTesting
   void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException {
     LockState ls = acquireLocks(plan, ctx, username, true);
-    if (ls != null) { // If there's no lock, we don't need to do heartbeat
+    if (ls != null && !isTxnOpen()) { // If there's no lock, we don't need to do heartbeat
+      // Start heartbeat for read-only queries which don't open transactions but requires locks.
+      // For those that require transactions, the heartbeat has already been started in openTxn.
       ctx.setHeartbeater(startHeartbeat(delay));
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index f001f59..24fbd9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -49,7 +49,7 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   private HiveLockManager lockMgr;
 
   @Override
-  public long openTxn(String user) throws LockException {
+  public long openTxn(Context ctx, String user) throws LockException {
     // No-op
     return 0L;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 5b9ad60..ce220a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -38,11 +38,12 @@ public interface HiveTxnManager {
 
   /**
    * Open a new transaction.
+   * @param ctx Context for this query
    * @param user Hive user who is opening this transaction.
    * @return The new transaction id
    * @throws LockException if a transaction is already open.
    */
-  long openTxn(String user) throws LockException;
+  long openTxn(Context ctx, String user) throws LockException;
 
   /**
    * Get the lock manager.  This must be used rather than instantiating an
@@ -55,7 +56,7 @@ public interface HiveTxnManager {
 
   /**
    * Acquire all of the locks needed by a query.  If used with a query that
-   * requires transactions, this should be called after {@link #openTxn(String)}.
+   * requires transactions, this should be called after {@link #openTxn(Context, String)}.
    * A list of acquired locks will be stored in the
    * {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved
    * via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}.

http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 460bad5..3f99571 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -139,7 +139,7 @@ public class TestDbTxnManager {
   public void testSingleWriteTable() throws Exception {
     WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT);
     QueryPlan qp = new MockQueryPlan(this);
-    txnMgr.openTxn("fred");
+    txnMgr.openTxn(ctx, "fred");
     txnMgr.acquireLocks(qp, ctx, "fred");
     List<HiveLock> locks = ctx.getHiveLocks();
     Assert.assertEquals(1, locks.size());
@@ -155,7 +155,7 @@ public class TestDbTxnManager {
   public void testSingleWritePartition() throws Exception {
     WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
     QueryPlan qp = new MockQueryPlan(this);
-    txnMgr.openTxn("fred");
+    txnMgr.openTxn(ctx, "fred");
     txnMgr.acquireLocks(qp, ctx, "fred");
     List<HiveLock> locks = ctx.getHiveLocks();
     Assert.assertEquals(1, locks.size());
@@ -170,7 +170,7 @@ public class TestDbTxnManager {
   public void testWriteDynamicPartition() throws Exception {
     WriteEntity we = addDynamicPartitionedOutput(newTable(true), WriteEntity.WriteType.INSERT);
     QueryPlan qp = new MockQueryPlan(this);
-    txnMgr.openTxn("fred");
+    txnMgr.openTxn(ctx, "fred");
     txnMgr.acquireLocks(qp, ctx, "fred");
     List<HiveLock> locks = ctx.getHiveLocks();
     Assert.assertEquals(1, locks.size());
@@ -213,7 +213,7 @@ public class TestDbTxnManager {
   public void testExceptions() throws Exception {
     addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
     QueryPlan qp = new MockQueryPlan(this);
-    txnMgr.openTxn("NicholasII");
+    ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
     Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
     runReaper();
     LockException exception = null;
@@ -227,7 +227,7 @@ public class TestDbTxnManager {
     Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
 
     exception = null;
-    txnMgr.openTxn("AlexanderIII");
+    ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
     Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
     runReaper();
     try {
@@ -245,26 +245,43 @@ public class TestDbTxnManager {
     addPartitionInput(newTable(true));
     QueryPlan qp = new MockQueryPlan(this);
     //make sure it works with nothing to expire
-    expireLocks(txnMgr, 0);
+    testLockExpiration(txnMgr, 0, true);
+
     //create a few read locks, all on the same resource
     for(int i = 0; i < 5; i++) {
       ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
     }
-    expireLocks(txnMgr, 5);
+    testLockExpiration(txnMgr, 5, true);
+
     //create a lot of locks
     for(int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
       ((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
     }
-    expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
+    testLockExpiration(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17, true);
+
+    // Create a lock, but send the heartbeat with a long delay. The lock will get expired.
+    ((DbTxnManager)txnMgr).acquireLocksWithHeartbeatDelay(qp, ctx, "bob",
+        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 10);
+    testLockExpiration(txnMgr, 1, true);
+
+    // Create a lock and trigger a heartbeat. With heartbeat, the lock won't expire.
+    txnMgr.acquireLocks(qp, ctx, "peter");
+    testLockExpiration(txnMgr, 1, false);
   }
-  private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception {
+
+  private void testLockExpiration(HiveTxnManager txnMgr, int numLocksBefore, boolean shouldExpire) throws Exception {
     DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();
     ShowLocksResponse resp = lockManager.getLocks();
     Assert.assertEquals("Wrong number of locks before expire", numLocksBefore, resp.getLocks().size());
     Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
     runReaper();
     resp = lockManager.getLocks();
-    Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size());
+    if (shouldExpire) {
+      Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size());
+      lockManager.clearLocalLockRecords();
+    } else {
+      Assert.assertEquals("No lock should expire because there is heartbeating", numLocksBefore, resp.getLocks().size());
+    }
   }
 
   @Test
@@ -275,7 +292,7 @@ public class TestDbTxnManager {
     addPartitionInput(t);
     WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT);
     QueryPlan qp = new MockQueryPlan(this);
-    txnMgr.openTxn("fred");
+    txnMgr.openTxn(ctx, "fred");
     txnMgr.acquireLocks(qp, ctx, "fred");
     List<HiveLock> locks = ctx.getHiveLocks();
     Assert.assertEquals(1, locks.size());
@@ -290,7 +307,7 @@ public class TestDbTxnManager {
   public void testUpdate() throws Exception {
     WriteEntity we = addTableOutput(WriteEntity.WriteType.UPDATE);
     QueryPlan qp = new MockQueryPlan(this);
-    txnMgr.openTxn("fred");
+    txnMgr.openTxn(ctx, "fred");
     txnMgr.acquireLocks(qp, ctx, "fred");
     List<HiveLock> locks = ctx.getHiveLocks();
     Assert.assertEquals(1, locks.size());
@@ -305,7 +322,7 @@ public class TestDbTxnManager {
   public void testDelete() throws Exception {
     WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE);
     QueryPlan qp = new MockQueryPlan(this);
-    txnMgr.openTxn("fred");
+    txnMgr.openTxn(ctx, "fred");
     txnMgr.acquireLocks(qp, ctx, "fred");
     List<HiveLock> locks = ctx.getHiveLocks();
     Assert.assertEquals(1, locks.size());
@@ -320,7 +337,7 @@ public class TestDbTxnManager {
   public void testRollback() throws Exception {
     WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE);
     QueryPlan qp = new MockQueryPlan(this);
-    txnMgr.openTxn("fred");
+    txnMgr.openTxn(ctx, "fred");
     txnMgr.acquireLocks(qp, ctx, "fred");
     List<HiveLock> locks = ctx.getHiveLocks();
     Assert.assertEquals(1, locks.size());
@@ -404,7 +421,7 @@ public class TestDbTxnManager {
     QueryPlan qp = new MockQueryPlan(this);
 
     // Case 1: If there's no delay for the heartbeat, txn should be able to commit
-    txnMgr.openTxn("fred");
+    txnMgr.openTxn(ctx, "fred");
     txnMgr.acquireLocks(qp, ctx, "fred"); // heartbeat started..
     runReaper();
     try {
@@ -417,10 +434,10 @@ public class TestDbTxnManager {
 
     // Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance,
     //         then txt should be able to commit
-    txnMgr.openTxn("tom");
     // Start the heartbeat after a delay, which is shorter than  the HIVE_TXN_TIMEOUT
-    ((DbTxnManager) txnMgr).acquireLocksWithHeartbeatDelay(qp, ctx, "tom",
+    ((DbTxnManager) txnMgr).openTxn(ctx, "tom",
         HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2);
+    txnMgr.acquireLocks(qp, ctx, "tom");
     runReaper();
     try {
       txnMgr.commitTxn();
@@ -433,9 +450,9 @@ public class TestDbTxnManager {
     // Case 3: If there's delay for the heartbeat, and the delay is long enough to trigger the reaper,
     //         then the txn will time out and be aborted.
     //         Here we just don't send the heartbeat at all - an infinite delay.
-    txnMgr.openTxn("jerry");
     // Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT
-    ((DbTxnManager) txnMgr).acquireLocks(qp, ctx, "jerry", true);
+    ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
+    txnMgr.acquireLocks(qp, ctx, "jerry");
     Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
     runReaper();
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 3c9358d..725558f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -107,7 +107,7 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
 
     checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)"));
-    txnMgr.openTxn("one");
+    txnMgr.openTxn(ctx, "one");
     txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
     List<ShowLocksResponseElement> locks = getLocks();
     Assert.assertEquals("Unexpected lock count", 2, locks.size());
@@ -116,7 +116,7 @@ public class TestDbTxnManager2 {
     txnMgr.rollbackTxn();
 
     checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)"));
-    txnMgr.openTxn("one");
+    txnMgr.openTxn(ctx, "one");
     txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
     locks = getLocks();
     Assert.assertEquals("Unexpected lock count", 2, locks.size());
@@ -125,7 +125,7 @@ public class TestDbTxnManager2 {
     txnMgr.rollbackTxn();
 
     checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)"));
-    txnMgr.openTxn("three");
+    txnMgr.openTxn(ctx, "three");
     txnMgr.acquireLocks(driver.getPlan(), ctx, "three");
     locks = getLocks();
     Assert.assertEquals("Unexpected lock count", 3, locks.size());
@@ -227,7 +227,7 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");
     checkCmdOnDriver(cpr);
-    txnMgr.openTxn("Fifer");
+    txnMgr.openTxn(ctx, "Fifer");
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
     checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp"));
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
@@ -253,12 +253,12 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     cpr = driver.compileAndRespond("delete from T8 where b = 89");
     checkCmdOnDriver(cpr);
-    txnMgr.openTxn("Fifer");
+    txnMgr.openTxn(ctx, "Fifer");
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8
     cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8
     checkCmdOnDriver(cpr);
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-    txnMgr2.openTxn("Fiddler");
+    txnMgr2.openTxn(ctx, "Fiddler");
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler");
     checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1"));
     ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
@@ -662,7 +662,7 @@ public class TestDbTxnManager2 {
       "clustered by (na) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='false')"));
     checkCmdOnDriver(driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')"));
     checkCmdOnDriver(driver.run("insert into tab_not_acid partition(np) (na,nb,np) values(1,2,'blah'),(3,4,'doh')"));
-    txnMgr.openTxn("T1");
+    txnMgr.openTxn(ctx, "T1");
     checkCmdOnDriver(driver.compileAndRespond("select * from tab_acid inner join tab_not_acid on a = na"));
     txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
@@ -675,7 +675,7 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "tab_not_acid", "np=doh", locks);
 
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-    txnMgr2.openTxn("T2");
+    txnMgr2.openTxn(ctx, "T2");
     checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='doh') values(5,6)"));
     LockState ls = ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "T2", false);
     locks = getLocks(txnMgr2, true);
@@ -899,12 +899,12 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
 
     checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART"));
-    txnMgr.openTxn("Nicholas");
+    txnMgr.openTxn(ctx, "Nicholas");
     checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas");
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
     txnMgr.commitTxn();
-    txnMgr2.openTxn("Alexandra");
+    txnMgr2.openTxn(ctx, "Alexandra");
     checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas");
     txnMgr2.commitTxn();
@@ -928,10 +928,10 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
 
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-    txnMgr.openTxn("Peter");
+    txnMgr.openTxn(ctx, "Peter");
     checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Peter");
-    txnMgr2.openTxn("Catherine");
+    txnMgr2.openTxn(ctx, "Catherine");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr);
     Assert.assertEquals("Unexpected lock count", 1, locks.size());
     //note that "update" uses dynamic partitioning thus lock is on the table not partition
@@ -953,8 +953,8 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)"));
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
 
-    long txnId = txnMgr.openTxn("Known");
-    long txnId2 = txnMgr2.openTxn("Unknown");
+    long txnId = txnMgr.openTxn(ctx, "Known");
+    long txnId2 = txnMgr2.openTxn(ctx, "Unknown");
     checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr);
@@ -1003,7 +1003,7 @@ public class TestDbTxnManager2 {
       "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
     
-    txnMgr.openTxn("Long Running");
+    txnMgr.openTxn(ctx, "Long Running");
     checkCmdOnDriver(driver.compileAndRespond("select a from  TAB_PART where p = 'blah'"));
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr);
@@ -1014,7 +1014,7 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks);
 
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-    txnMgr2.openTxn("Short Running");
+    txnMgr2.openTxn(ctx, "Short Running");
     checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'"));//no such partition
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "Short Running");
     locks = getLocks(txnMgr);
@@ -1031,7 +1031,7 @@ public class TestDbTxnManager2 {
     //Short Running updated nothing, so we expect 0 rows in WRITE_SET
     Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
 
-    txnMgr2.openTxn("T3");
+    txnMgr2.openTxn(ctx, "T3");
     checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'"));//pretend this partition exists
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "T3");
     locks = getLocks(txnMgr);
@@ -1080,8 +1080,8 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)"));
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
 
-    txnMgr.openTxn("Known");
-    long txnId = txnMgr2.openTxn("Unknown");
+    txnMgr.openTxn(ctx, "Known");
+    long txnId = txnMgr2.openTxn(ctx, "Unknown");
     checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr);
@@ -1119,7 +1119,7 @@ public class TestDbTxnManager2 {
     Assert.assertEquals("Unexpected lock count", 1, locks.size());
     checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks);
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-    txnMgr2.openTxn("Horton");
+    txnMgr2.openTxn(ctx, "Horton");
     checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101"));
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton");
     Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
@@ -1151,7 +1151,7 @@ public class TestDbTxnManager2 {
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
 
     //test with predicates such that partition pruning works
-    txnMgr2.openTxn("T2");
+    txnMgr2.openTxn(ctx, "T2");
     checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'"));
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
@@ -1159,7 +1159,7 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks);
 
     //now start concurrent txn
-    txnMgr.openTxn("T3");
+    txnMgr.openTxn(ctx, "T3");
     checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'"));
     ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
     locks = getLocks(txnMgr);
@@ -1199,7 +1199,7 @@ public class TestDbTxnManager2 {
       "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
     checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4
-    txnMgr2.openTxn("T5");
+    txnMgr2.openTxn(ctx, "T5");
     checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5");
     locks = getLocks(txnMgr2);
@@ -1208,7 +1208,7 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
 
     //now start concurrent txn
-    txnMgr.openTxn("T6");
+    txnMgr.openTxn(ctx, "T6");
     checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2"));
     ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false);
     locks = getLocks(txnMgr);
@@ -1256,7 +1256,7 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-    txnMgr2.openTxn("T2");
+    txnMgr2.openTxn(ctx, "T2");
     checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
@@ -1265,7 +1265,7 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
 
     //now start concurrent txn
-    txnMgr.openTxn("T3");
+    txnMgr.openTxn(ctx, "T3");
     checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'"));
     ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
     locks = getLocks(txnMgr);
@@ -1310,7 +1310,7 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-    txnMgr2.openTxn("T2");
+    txnMgr2.openTxn(ctx, "T2");
     checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
@@ -1319,7 +1319,7 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
 
     //now start concurrent txn
-    txnMgr.openTxn("T3");
+    txnMgr.openTxn(ctx, "T3");
     checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
     ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
     locks = getLocks(txnMgr);
@@ -1370,7 +1370,7 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-    txnMgr2.openTxn("T2");
+    txnMgr2.openTxn(ctx, "T2");
     checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2"));
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
@@ -1379,7 +1379,7 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
 
     //now start concurrent txn
-    txnMgr.openTxn("T3");
+    txnMgr.openTxn(ctx, "T3");
     checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
     ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
     locks = getLocks(txnMgr);
@@ -1432,7 +1432,7 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-    txnMgr2.openTxn("T2");
+    txnMgr2.openTxn(ctx, "T2");
     checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
@@ -1441,7 +1441,7 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
 
     //now start concurrent txn
-    txnMgr.openTxn("T3");
+    txnMgr.openTxn(ctx, "T3");
     checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'"));
     ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
     checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
@@ -1598,7 +1598,7 @@ public class TestDbTxnManager2 {
       "(9,100,1,2),      (3,4,1,2),               (5,13,1,3),       (7,8,2,2), (14,15,2,1)"));
     
 
-    long txnId1 = txnMgr.openTxn("T1");
+    long txnId1 = txnMgr.openTxn(ctx, "T1");
     checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " +
       "when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3
       "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
@@ -1614,7 +1614,7 @@ public class TestDbTxnManager2 {
 
     //start concurrent txn
     DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-    long txnId2 = txnMgr2.openTxn("T2");
+    long txnId2 = txnMgr2.openTxn(ctx, "T2");
     checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " +
       "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2
       "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
@@ -1806,7 +1806,7 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)"));
     checkCmdOnDriver(driver.run("create table source (a int, b int)"));
     
-    long txnid1 = txnMgr.openTxn("T1");
+    long txnid1 = txnMgr.openTxn(ctx, "T1");
     if(causeConflict) {
       checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1"));
     }
@@ -1827,7 +1827,7 @@ public class TestDbTxnManager2 {
 
     DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
     //start a 2nd (overlapping) txn
-    long txnid2 = txnMgr2.openTxn("T2");
+    long txnid2 = txnMgr2.openTxn(ctx, "T2");
     checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " +
       "on t.a=s.a " +
       "when matched then delete " +
@@ -1903,7 +1903,7 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(driver.run("create table target (a int, b int) " +
       "partitioned by (p int, q int) clustered by (a) into 2  buckets " +
       "stored as orc TBLPROPERTIES ('transactional'='true')"));
-    long txnid1 = txnMgr.openTxn("T1");
+    long txnid1 = txnMgr.openTxn(ctx, "T1");
     checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)"));
     txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
@@ -1935,7 +1935,7 @@ public class TestDbTxnManager2 {
       TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (txnid1 + 1)));
 
 
-    long txnid2 = txnMgr.openTxn("T1");
+    long txnid2 = txnMgr.openTxn(ctx, "T1");
     checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)"));
     txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
     locks = getLocks(txnMgr, true);
@@ -1974,7 +1974,7 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)"));
     checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)"));
 
-    long txnId1 = txnMgr.openTxn("T1");
+    long txnId1 = txnMgr.openTxn(ctx, "T1");
     checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1"));
     txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
@@ -1984,7 +1984,7 @@ public class TestDbTxnManager2 {
 
     DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
     //start a 2nd (overlapping) txn
-    long txnid2 = txnMgr2.openTxn("T2");
+    long txnid2 = txnMgr2.openTxn(ctx, "T2");
     checkCmdOnDriver(driver.compileAndRespond("merge into target using source " +
       "on target.p=source.p1 and target.a=source.a1 " +
       "when matched then update set b = 11 " +

http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
index 9862887..a9814f3 100644
--- a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
+++ b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
@@ -2,17 +2,17 @@ PREHOOK: query: show locks
 PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks
 POSTHOOK: type: SHOWLOCKS
-Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname	Agent Info
+Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Heartbeat	Acquired At	User	Hostname	Agent Info
 PREHOOK: query: show locks extended
 PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks extended
 POSTHOOK: type: SHOWLOCKS
-Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname	Agent Info
+Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Heartbeat	Acquired At	User	Hostname	Agent Info
 PREHOOK: query: show locks default
 PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks default
 POSTHOOK: type: SHOWLOCKS
-Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname	Agent Info
+Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Heartbeat	Acquired At	User	Hostname	Agent Info
 PREHOOK: query: show transactions
 PREHOOK: type: SHOW TRANSACTIONS
 POSTHOOK: query: show transactions
@@ -30,27 +30,27 @@ PREHOOK: query: show locks database default
 PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks database default
 POSTHOOK: type: SHOWLOCKS
-Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname	Agent Info
+Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Heartbeat	Acquired At	User	Hostname	Agent Info
 PREHOOK: query: show locks partitioned_acid_table
 PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks partitioned_acid_table
 POSTHOOK: type: SHOWLOCKS
-Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname	Agent Info
+Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Heartbeat	Acquired At	User	Hostname	Agent Info
 PREHOOK: query: show locks partitioned_acid_table extended
 PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks partitioned_acid_table extended
 POSTHOOK: type: SHOWLOCKS
-Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname	Agent Info
+Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Heartbeat	Acquired At	User	Hostname	Agent Info
 PREHOOK: query: show locks partitioned_acid_table partition (p='abc')
 PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks partitioned_acid_table partition (p='abc')
 POSTHOOK: type: SHOWLOCKS
-Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname	Agent Info
+Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Heartbeat	Acquired At	User	Hostname	Agent Info
 PREHOOK: query: show locks partitioned_acid_table partition (p='abc') extended
 PREHOOK: type: SHOWLOCKS
 POSTHOOK: query: show locks partitioned_acid_table partition (p='abc') extended
 POSTHOOK: type: SHOWLOCKS
-Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Hearbeat	Acquired At	User	Hostname	Agent Info
+Lock ID	Database	Table	Partition	State	Blocked By	Type	Transaction ID	Last Heartbeat	Acquired At	User	Hostname	Agent Info
 PREHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2)
 PREHOOK: type: QUERY
 PREHOOK: Output: default@partitioned_acid_table@p=abc