You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/12/22 19:06:48 UTC
hive git commit: HIVE-15376 : Improve heartbeater scheduling for
transactions (Wei Zheng, reviewed by Eugene Koifman)
Repository: hive
Updated Branches:
refs/heads/master 71f250db3 -> ee35ccb19
HIVE-15376 : Improve heartbeater scheduling for transactions (Wei Zheng, reviewed by Eugene Koifman)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ee35ccb1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ee35ccb1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ee35ccb1
Branch: refs/heads/master
Commit: ee35ccb19f04f4437c835a7ac960e6b9762d5164
Parents: 71f250d
Author: Wei Zheng <we...@apache.org>
Authored: Thu Dec 22 11:06:36 2016 -0800
Committer: Wei Zheng <we...@apache.org>
Committed: Thu Dec 22 11:06:36 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Driver.java | 2 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 12 ++-
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 2 +-
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 5 +-
.../hive/ql/lockmgr/TestDbTxnManager.java | 55 +++++++++-----
.../hive/ql/lockmgr/TestDbTxnManager2.java | 80 ++++++++++----------
.../clientpositive/dbtxnmgr_showlocks.q.out | 16 ++--
7 files changed, 99 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 1bc0956..fd6020b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1102,7 +1102,7 @@ public class Driver implements CommandProcessor {
throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId());
}
// We are writing to tables in an ACID compliant way, so we need to open a transaction
- txnMgr.openTxn(userFromUGI);
+ txnMgr.openTxn(ctx, userFromUGI);
initiatingTransaction = true;
}
else {
http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index a79c106..28ca77c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -135,7 +135,12 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
@Override
- public long openTxn(String user) throws LockException {
+ public long openTxn(Context ctx, String user) throws LockException {
+ return openTxn(ctx, user, 0);
+ }
+
+ @VisibleForTesting
+ long openTxn(Context ctx, String user, long delay) throws LockException {
//todo: why don't we lock the snapshot here??? Instead of having client make an explicit call
//whenever it chooses
init();
@@ -146,6 +151,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
txnId = client.openTxn(user);
statementId = 0;
LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
+ ctx.setHeartbeater(startHeartbeat(delay));
return txnId;
} catch (TException e) {
throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED);
@@ -353,7 +359,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
@VisibleForTesting
void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException {
LockState ls = acquireLocks(plan, ctx, username, true);
- if (ls != null) { // If there's no lock, we don't need to do heartbeat
+ if (ls != null && !isTxnOpen()) { // If there's no lock, we don't need to do heartbeat
+ // Start heartbeat for read-only queries which don't open transactions but requires locks.
+ // For those that require transactions, the heartbeat has already been started in openTxn.
ctx.setHeartbeater(startHeartbeat(delay));
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index f001f59..24fbd9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -49,7 +49,7 @@ class DummyTxnManager extends HiveTxnManagerImpl {
private HiveLockManager lockMgr;
@Override
- public long openTxn(String user) throws LockException {
+ public long openTxn(Context ctx, String user) throws LockException {
// No-op
return 0L;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 5b9ad60..ce220a2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -38,11 +38,12 @@ public interface HiveTxnManager {
/**
* Open a new transaction.
+ * @param ctx Context for this query
* @param user Hive user who is opening this transaction.
* @return The new transaction id
* @throws LockException if a transaction is already open.
*/
- long openTxn(String user) throws LockException;
+ long openTxn(Context ctx, String user) throws LockException;
/**
* Get the lock manager. This must be used rather than instantiating an
@@ -55,7 +56,7 @@ public interface HiveTxnManager {
/**
* Acquire all of the locks needed by a query. If used with a query that
- * requires transactions, this should be called after {@link #openTxn(String)}.
+ * requires transactions, this should be called after {@link #openTxn(Context, String)}.
* A list of acquired locks will be stored in the
* {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved
* via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}.
http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 460bad5..3f99571 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -139,7 +139,7 @@ public class TestDbTxnManager {
public void testSingleWriteTable() throws Exception {
WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT);
QueryPlan qp = new MockQueryPlan(this);
- txnMgr.openTxn("fred");
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
@@ -155,7 +155,7 @@ public class TestDbTxnManager {
public void testSingleWritePartition() throws Exception {
WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
QueryPlan qp = new MockQueryPlan(this);
- txnMgr.openTxn("fred");
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
@@ -170,7 +170,7 @@ public class TestDbTxnManager {
public void testWriteDynamicPartition() throws Exception {
WriteEntity we = addDynamicPartitionedOutput(newTable(true), WriteEntity.WriteType.INSERT);
QueryPlan qp = new MockQueryPlan(this);
- txnMgr.openTxn("fred");
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
@@ -213,7 +213,7 @@ public class TestDbTxnManager {
public void testExceptions() throws Exception {
addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
QueryPlan qp = new MockQueryPlan(this);
- txnMgr.openTxn("NicholasII");
+ ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
runReaper();
LockException exception = null;
@@ -227,7 +227,7 @@ public class TestDbTxnManager {
Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
exception = null;
- txnMgr.openTxn("AlexanderIII");
+ ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
runReaper();
try {
@@ -245,26 +245,43 @@ public class TestDbTxnManager {
addPartitionInput(newTable(true));
QueryPlan qp = new MockQueryPlan(this);
//make sure it works with nothing to expire
- expireLocks(txnMgr, 0);
+ testLockExpiration(txnMgr, 0, true);
+
//create a few read locks, all on the same resource
for(int i = 0; i < 5; i++) {
((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
}
- expireLocks(txnMgr, 5);
+ testLockExpiration(txnMgr, 5, true);
+
//create a lot of locks
for(int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17; i++) {
((DbTxnManager)txnMgr).acquireLocks(qp, ctx, "PeterI" + i, true); // No heartbeat
}
- expireLocks(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17);
+ testLockExpiration(txnMgr, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 17, true);
+
+ // Create a lock, but send the heartbeat with a long delay. The lock will get expired.
+ ((DbTxnManager)txnMgr).acquireLocksWithHeartbeatDelay(qp, ctx, "bob",
+ HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 10);
+ testLockExpiration(txnMgr, 1, true);
+
+ // Create a lock and trigger a heartbeat. With heartbeat, the lock won't expire.
+ txnMgr.acquireLocks(qp, ctx, "peter");
+ testLockExpiration(txnMgr, 1, false);
}
- private void expireLocks(HiveTxnManager txnMgr, int numLocksBefore) throws Exception {
+
+ private void testLockExpiration(HiveTxnManager txnMgr, int numLocksBefore, boolean shouldExpire) throws Exception {
DbLockManager lockManager = (DbLockManager)txnMgr.getLockManager();
ShowLocksResponse resp = lockManager.getLocks();
Assert.assertEquals("Wrong number of locks before expire", numLocksBefore, resp.getLocks().size());
Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
runReaper();
resp = lockManager.getLocks();
- Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size());
+ if (shouldExpire) {
+ Assert.assertEquals("Expected all locks to expire", 0, resp.getLocks().size());
+ lockManager.clearLocalLockRecords();
+ } else {
+ Assert.assertEquals("No lock should expire because there is heartbeating", numLocksBefore, resp.getLocks().size());
+ }
}
@Test
@@ -275,7 +292,7 @@ public class TestDbTxnManager {
addPartitionInput(t);
WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT);
QueryPlan qp = new MockQueryPlan(this);
- txnMgr.openTxn("fred");
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
@@ -290,7 +307,7 @@ public class TestDbTxnManager {
public void testUpdate() throws Exception {
WriteEntity we = addTableOutput(WriteEntity.WriteType.UPDATE);
QueryPlan qp = new MockQueryPlan(this);
- txnMgr.openTxn("fred");
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
@@ -305,7 +322,7 @@ public class TestDbTxnManager {
public void testDelete() throws Exception {
WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE);
QueryPlan qp = new MockQueryPlan(this);
- txnMgr.openTxn("fred");
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
@@ -320,7 +337,7 @@ public class TestDbTxnManager {
public void testRollback() throws Exception {
WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE);
QueryPlan qp = new MockQueryPlan(this);
- txnMgr.openTxn("fred");
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
@@ -404,7 +421,7 @@ public class TestDbTxnManager {
QueryPlan qp = new MockQueryPlan(this);
// Case 1: If there's no delay for the heartbeat, txn should be able to commit
- txnMgr.openTxn("fred");
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred"); // heartbeat started..
runReaper();
try {
@@ -417,10 +434,10 @@ public class TestDbTxnManager {
// Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance,
// then txt should be able to commit
- txnMgr.openTxn("tom");
// Start the heartbeat after a delay, which is shorter than the HIVE_TXN_TIMEOUT
- ((DbTxnManager) txnMgr).acquireLocksWithHeartbeatDelay(qp, ctx, "tom",
+ ((DbTxnManager) txnMgr).openTxn(ctx, "tom",
HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2);
+ txnMgr.acquireLocks(qp, ctx, "tom");
runReaper();
try {
txnMgr.commitTxn();
@@ -433,9 +450,9 @@ public class TestDbTxnManager {
// Case 3: If there's delay for the heartbeat, and the delay is long enough to trigger the reaper,
// then the txn will time out and be aborted.
// Here we just don't send the heartbeat at all - an infinite delay.
- txnMgr.openTxn("jerry");
// Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT
- ((DbTxnManager) txnMgr).acquireLocks(qp, ctx, "jerry", true);
+ ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
+ txnMgr.acquireLocks(qp, ctx, "jerry");
Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
runReaper();
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 3c9358d..725558f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -107,7 +107,7 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)"));
- txnMgr.openTxn("one");
+ txnMgr.openTxn(ctx, "one");
txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
@@ -116,7 +116,7 @@ public class TestDbTxnManager2 {
txnMgr.rollbackTxn();
checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)"));
- txnMgr.openTxn("one");
+ txnMgr.openTxn(ctx, "one");
txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
@@ -125,7 +125,7 @@ public class TestDbTxnManager2 {
txnMgr.rollbackTxn();
checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)"));
- txnMgr.openTxn("three");
+ txnMgr.openTxn(ctx, "three");
txnMgr.acquireLocks(driver.getPlan(), ctx, "three");
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 3, locks.size());
@@ -227,7 +227,7 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");
checkCmdOnDriver(cpr);
- txnMgr.openTxn("Fifer");
+ txnMgr.openTxn(ctx, "Fifer");
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp"));
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
@@ -253,12 +253,12 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
cpr = driver.compileAndRespond("delete from T8 where b = 89");
checkCmdOnDriver(cpr);
- txnMgr.openTxn("Fifer");
+ txnMgr.openTxn(ctx, "Fifer");
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8
cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8
checkCmdOnDriver(cpr);
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn("Fiddler");
+ txnMgr2.openTxn(ctx, "Fiddler");
txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler");
checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1"));
((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
@@ -662,7 +662,7 @@ public class TestDbTxnManager2 {
"clustered by (na) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')"));
checkCmdOnDriver(driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')"));
checkCmdOnDriver(driver.run("insert into tab_not_acid partition(np) (na,nb,np) values(1,2,'blah'),(3,4,'doh')"));
- txnMgr.openTxn("T1");
+ txnMgr.openTxn(ctx, "T1");
checkCmdOnDriver(driver.compileAndRespond("select * from tab_acid inner join tab_not_acid on a = na"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
@@ -675,7 +675,7 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "tab_not_acid", "np=doh", locks);
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn("T2");
+ txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("insert into tab_not_acid partition(np='doh') values(5,6)"));
LockState ls = ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "T2", false);
locks = getLocks(txnMgr2, true);
@@ -899,12 +899,12 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART"));
- txnMgr.openTxn("Nicholas");
+ txnMgr.openTxn(ctx, "Nicholas");
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas");
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
txnMgr.commitTxn();
- txnMgr2.openTxn("Alexandra");
+ txnMgr2.openTxn(ctx, "Alexandra");
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas");
txnMgr2.commitTxn();
@@ -928,10 +928,10 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr.openTxn("Peter");
+ txnMgr.openTxn(ctx, "Peter");
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "Peter");
- txnMgr2.openTxn("Catherine");
+ txnMgr2.openTxn(ctx, "Catherine");
List<ShowLocksResponseElement> locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 1, locks.size());
//note that "update" uses dynamic partitioning thus lock is on the table not partition
@@ -953,8 +953,8 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)"));
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- long txnId = txnMgr.openTxn("Known");
- long txnId2 = txnMgr2.openTxn("Unknown");
+ long txnId = txnMgr.openTxn(ctx, "Known");
+ long txnId2 = txnMgr2.openTxn(ctx, "Unknown");
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
List<ShowLocksResponseElement> locks = getLocks(txnMgr);
@@ -1003,7 +1003,7 @@ public class TestDbTxnManager2 {
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
- txnMgr.openTxn("Long Running");
+ txnMgr.openTxn(ctx, "Long Running");
checkCmdOnDriver(driver.compileAndRespond("select a from TAB_PART where p = 'blah'"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
List<ShowLocksResponseElement> locks = getLocks(txnMgr);
@@ -1014,7 +1014,7 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks);
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn("Short Running");
+ txnMgr2.openTxn(ctx, "Short Running");
checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'"));//no such partition
txnMgr2.acquireLocks(driver.getPlan(), ctx, "Short Running");
locks = getLocks(txnMgr);
@@ -1031,7 +1031,7 @@ public class TestDbTxnManager2 {
//Short Running updated nothing, so we expect 0 rows in WRITE_SET
Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
- txnMgr2.openTxn("T3");
+ txnMgr2.openTxn(ctx, "T3");
checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'"));//pretend this partition exists
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T3");
locks = getLocks(txnMgr);
@@ -1080,8 +1080,8 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)"));
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr.openTxn("Known");
- long txnId = txnMgr2.openTxn("Unknown");
+ txnMgr.openTxn(ctx, "Known");
+ long txnId = txnMgr2.openTxn(ctx, "Unknown");
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
List<ShowLocksResponseElement> locks = getLocks(txnMgr);
@@ -1119,7 +1119,7 @@ public class TestDbTxnManager2 {
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks);
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn("Horton");
+ txnMgr2.openTxn(ctx, "Horton");
checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton");
Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
@@ -1151,7 +1151,7 @@ public class TestDbTxnManager2 {
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
//test with predicates such that partition pruning works
- txnMgr2.openTxn("T2");
+ txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
@@ -1159,7 +1159,7 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks);
//now start concurrent txn
- txnMgr.openTxn("T3");
+ txnMgr.openTxn(ctx, "T3");
checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'"));
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
locks = getLocks(txnMgr);
@@ -1199,7 +1199,7 @@ public class TestDbTxnManager2 {
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4
- txnMgr2.openTxn("T5");
+ txnMgr2.openTxn(ctx, "T5");
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5");
locks = getLocks(txnMgr2);
@@ -1208,7 +1208,7 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
//now start concurrent txn
- txnMgr.openTxn("T6");
+ txnMgr.openTxn(ctx, "T6");
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2"));
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false);
locks = getLocks(txnMgr);
@@ -1256,7 +1256,7 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn("T2");
+ txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
@@ -1265,7 +1265,7 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
//now start concurrent txn
- txnMgr.openTxn("T3");
+ txnMgr.openTxn(ctx, "T3");
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'"));
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
locks = getLocks(txnMgr);
@@ -1310,7 +1310,7 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn("T2");
+ txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
@@ -1319,7 +1319,7 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
//now start concurrent txn
- txnMgr.openTxn("T3");
+ txnMgr.openTxn(ctx, "T3");
checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
locks = getLocks(txnMgr);
@@ -1370,7 +1370,7 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn("T2");
+ txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
@@ -1379,7 +1379,7 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
//now start concurrent txn
- txnMgr.openTxn("T3");
+ txnMgr.openTxn(ctx, "T3");
checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
locks = getLocks(txnMgr);
@@ -1432,7 +1432,7 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn("T2");
+ txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
@@ -1441,7 +1441,7 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
//now start concurrent txn
- txnMgr.openTxn("T3");
+ txnMgr.openTxn(ctx, "T3");
checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'"));
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
@@ -1598,7 +1598,7 @@ public class TestDbTxnManager2 {
"(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)"));
- long txnId1 = txnMgr.openTxn("T1");
+ long txnId1 = txnMgr.openTxn(ctx, "T1");
checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " +
"when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3
"when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
@@ -1614,7 +1614,7 @@ public class TestDbTxnManager2 {
//start concurrent txn
DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- long txnId2 = txnMgr2.openTxn("T2");
+ long txnId2 = txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " +
"when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2
"when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
@@ -1806,7 +1806,7 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)"));
checkCmdOnDriver(driver.run("create table source (a int, b int)"));
- long txnid1 = txnMgr.openTxn("T1");
+ long txnid1 = txnMgr.openTxn(ctx, "T1");
if(causeConflict) {
checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1"));
}
@@ -1827,7 +1827,7 @@ public class TestDbTxnManager2 {
DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
//start a 2nd (overlapping) txn
- long txnid2 = txnMgr2.openTxn("T2");
+ long txnid2 = txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " +
"on t.a=s.a " +
"when matched then delete " +
@@ -1903,7 +1903,7 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(driver.run("create table target (a int, b int) " +
"partitioned by (p int, q int) clustered by (a) into 2 buckets " +
"stored as orc TBLPROPERTIES ('transactional'='true')"));
- long txnid1 = txnMgr.openTxn("T1");
+ long txnid1 = txnMgr.openTxn(ctx, "T1");
checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
@@ -1935,7 +1935,7 @@ public class TestDbTxnManager2 {
TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (txnid1 + 1)));
- long txnid2 = txnMgr.openTxn("T1");
+ long txnid2 = txnMgr.openTxn(ctx, "T1");
checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
locks = getLocks(txnMgr, true);
@@ -1974,7 +1974,7 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)"));
checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)"));
- long txnId1 = txnMgr.openTxn("T1");
+ long txnId1 = txnMgr.openTxn(ctx, "T1");
checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
@@ -1984,7 +1984,7 @@ public class TestDbTxnManager2 {
DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
//start a 2nd (overlapping) txn
- long txnid2 = txnMgr2.openTxn("T2");
+ long txnid2 = txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("merge into target using source " +
"on target.p=source.p1 and target.a=source.a1 " +
"when matched then update set b = 11 " +
http://git-wip-us.apache.org/repos/asf/hive/blob/ee35ccb1/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
index 9862887..a9814f3 100644
--- a/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
+++ b/ql/src/test/results/clientpositive/dbtxnmgr_showlocks.q.out
@@ -2,17 +2,17 @@ PREHOOK: query: show locks
PREHOOK: type: SHOWLOCKS
POSTHOOK: query: show locks
POSTHOOK: type: SHOWLOCKS
-Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info
+Lock ID Database Table Partition State Blocked By Type Transaction ID Last Heartbeat Acquired At User Hostname Agent Info
PREHOOK: query: show locks extended
PREHOOK: type: SHOWLOCKS
POSTHOOK: query: show locks extended
POSTHOOK: type: SHOWLOCKS
-Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info
+Lock ID Database Table Partition State Blocked By Type Transaction ID Last Heartbeat Acquired At User Hostname Agent Info
PREHOOK: query: show locks default
PREHOOK: type: SHOWLOCKS
POSTHOOK: query: show locks default
POSTHOOK: type: SHOWLOCKS
-Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info
+Lock ID Database Table Partition State Blocked By Type Transaction ID Last Heartbeat Acquired At User Hostname Agent Info
PREHOOK: query: show transactions
PREHOOK: type: SHOW TRANSACTIONS
POSTHOOK: query: show transactions
@@ -30,27 +30,27 @@ PREHOOK: query: show locks database default
PREHOOK: type: SHOWLOCKS
POSTHOOK: query: show locks database default
POSTHOOK: type: SHOWLOCKS
-Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info
+Lock ID Database Table Partition State Blocked By Type Transaction ID Last Heartbeat Acquired At User Hostname Agent Info
PREHOOK: query: show locks partitioned_acid_table
PREHOOK: type: SHOWLOCKS
POSTHOOK: query: show locks partitioned_acid_table
POSTHOOK: type: SHOWLOCKS
-Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info
+Lock ID Database Table Partition State Blocked By Type Transaction ID Last Heartbeat Acquired At User Hostname Agent Info
PREHOOK: query: show locks partitioned_acid_table extended
PREHOOK: type: SHOWLOCKS
POSTHOOK: query: show locks partitioned_acid_table extended
POSTHOOK: type: SHOWLOCKS
-Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info
+Lock ID Database Table Partition State Blocked By Type Transaction ID Last Heartbeat Acquired At User Hostname Agent Info
PREHOOK: query: show locks partitioned_acid_table partition (p='abc')
PREHOOK: type: SHOWLOCKS
POSTHOOK: query: show locks partitioned_acid_table partition (p='abc')
POSTHOOK: type: SHOWLOCKS
-Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info
+Lock ID Database Table Partition State Blocked By Type Transaction ID Last Heartbeat Acquired At User Hostname Agent Info
PREHOOK: query: show locks partitioned_acid_table partition (p='abc') extended
PREHOOK: type: SHOWLOCKS
POSTHOOK: query: show locks partitioned_acid_table partition (p='abc') extended
POSTHOOK: type: SHOWLOCKS
-Lock ID Database Table Partition State Blocked By Type Transaction ID Last Hearbeat Acquired At User Hostname Agent Info
+Lock ID Database Table Partition State Blocked By Type Transaction ID Last Heartbeat Acquired At User Hostname Agent Info
PREHOOK: query: insert into partitioned_acid_table partition(p='abc') values(1,2)
PREHOOK: type: QUERY
PREHOOK: Output: default@partitioned_acid_table@p=abc