You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2017/05/01 16:43:34 UTC
[1/3] hive git commit: HIVE-12636 Ensure that all queries (with
DbTxnManager) run in a transaction (Eugene Koifman, reviewed by Wei Zheng)
Repository: hive
Updated Branches:
refs/heads/master 41c383287 -> 21909601f
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index afebf03..c31241a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -44,32 +45,51 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
/**
* See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager}
* Tests here are "end-to-end"ish and simulate concurrent queries.
*
* The general approach is to use an instance of Driver to use Driver.run() to create tables
- * Use Driver.compile() to generate QueryPlan which can then be passed to HiveTxnManager.acquireLocks().
- * Same HiveTxnManager is used to openTxn()/commitTxn() etc. This can exercise almost the entire
+ * Use Driver.compileAndRespond() (which also starts a txn) to generate QueryPlan which can then be
+ * passed to HiveTxnManager.acquireLocks().
+ * Same HiveTxnManager is used to commitTxn()/rollback etc. This can exercise almost the entire
* code path that CLI would but with the advantage that you can create a 2nd HiveTxnManager and then
* simulate interleaved transactional/locking operations but all from within a single thread.
* The later not only controls concurrency precisely but is the only way to run in UT env with DerbyDB.
+ *
+ * A slightly different (and simpler) approach is to use "start transaction/(commit/rollback)"
+ * command with the Driver.run(). This allows you to "see" the state of the Lock Manager after
+ * each statement and can also simulate concurrent (but very controlled) work but w/o forking any
+ * threads. The limitation here is that not all statements are allowed in an explicit transaction.
+ * For example, "drop table foo". This approach will also cause the query to execute which will
+ * make tests slower but will exericise the code path that is much closer to the actual user calls.
+ *
+ * In either approach, each logical "session" should use it's own Transaction Manager. This requires
+ * using {@link #swapTxnManager(HiveTxnManager)} since in the SessionState the TM is associated with
+ * each thread.
*/
public class TestDbTxnManager2 {
+ private static final Logger LOG = LoggerFactory.getLogger(TestDbTxnManager2.class);
private static HiveConf conf = new HiveConf(Driver.class);
private HiveTxnManager txnMgr;
private Context ctx;
private Driver driver;
- TxnStore txnHandler;
+ private TxnStore txnHandler;
public TestDbTxnManager2() throws Exception {
conf
@@ -106,7 +126,6 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
checkCmdOnDriver(driver.compileAndRespond("delete from S where a in (select a from T where b = 1)"));
- txnMgr.openTxn(ctx, "one");
txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
@@ -115,7 +134,6 @@ public class TestDbTxnManager2 {
txnMgr.rollbackTxn();
checkCmdOnDriver(driver.compileAndRespond("update S set a = 7 where a in (select a from T where b = 1)"));
- txnMgr.openTxn(ctx, "one");
txnMgr.acquireLocks(driver.getPlan(), ctx, "one");
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
@@ -124,13 +142,13 @@ public class TestDbTxnManager2 {
txnMgr.rollbackTxn();
checkCmdOnDriver(driver.compileAndRespond("insert into R select * from S where a in (select a from T where b = 1)"));
- txnMgr.openTxn(ctx, "three");
txnMgr.acquireLocks(driver.getPlan(), ctx, "three");
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 3, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T", null, locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "S", null, locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "R", null, locks);
+ txnMgr.rollbackTxn();
}
@Test
public void createTable() throws Exception {
@@ -141,7 +159,7 @@ public class TestDbTxnManager2 {
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", null, null, locks);
- txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks());
+ txnMgr.commitTxn();
Assert.assertEquals("Lock remained", 0, getLocks().size());
}
@Test
@@ -158,7 +176,7 @@ public class TestDbTxnManager2 {
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T2", null, locks);
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T3", null, locks);
- txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks());
+ txnMgr.commitTxn();
Assert.assertEquals("Lock remained", 0, getLocks().size());
cpr = driver.run("drop table if exists T1");
checkCmdOnDriver(cpr);
@@ -179,7 +197,7 @@ public class TestDbTxnManager2 {
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T5", null, locks);
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T4", null, locks);
- txnMgr.getLockManager().releaseLocks(ctx.getHiveLocks());
+ txnMgr.commitTxn();
Assert.assertEquals("Lock remained", 0, getLocks().size());
cpr = driver.run("drop table if exists T5");
checkCmdOnDriver(cpr);
@@ -195,23 +213,23 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets S lock on T6
List<HiveLock> selectLocks = ctx.getHiveLocks();
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
cpr = driver.compileAndRespond("drop table if exists T6");
checkCmdOnDriver(cpr);
//tries to get X lock on T1 and gets Waiting state
- LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);
+ LockState lockState = ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks);
checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks);
- txnMgr.getLockManager().releaseLocks(selectLocks);//release S on T6
+ txnMgr.rollbackTxn();//release S on T6
//attempt to X on T6 again - succeed
lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T6", null, locks);
- List<HiveLock> xLock = new ArrayList<HiveLock>(0);
- xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- txnMgr.getLockManager().releaseLocks(xLock);
+ txnMgr2.rollbackTxn();
cpr = driver.run("drop table if exists T6");
locks = getLocks();
Assert.assertEquals("Unexpected number of locks found", 0, locks.size());
@@ -224,26 +242,23 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
cpr = driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
- cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");
+ cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");//gets SS lock on T7
checkCmdOnDriver(cpr);
- txnMgr.openTxn(ctx, "Fifer");
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
- checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp"));
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- //txnMgr2.openTxn("Fiddler");
- ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7
+ swapTxnManager(txnMgr2);
+ checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp"));
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "temp", "T7", null, locks);
checkLock(LockType.EXCLUSIVE, LockState.WAITING, "temp", null, null, locks);
txnMgr.commitTxn();
- ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
+ ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid());
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "temp", null, null, locks);
- List<HiveLock> xLock = new ArrayList<HiveLock>(0);
- xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- txnMgr2.getLockManager().releaseLocks(xLock);
+ txnMgr2.commitTxn();
}
@Test
public void updateSelectUpdate() throws Exception {
@@ -252,12 +267,12 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
cpr = driver.compileAndRespond("delete from T8 where b = 89");
checkCmdOnDriver(cpr);
- txnMgr.openTxn(ctx, "Fifer");
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+ checkCmdOnDriver(driver.run("start transaction"));
cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8
checkCmdOnDriver(cpr);
- HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn(ctx, "Fiddler");
txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler");
checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1"));
((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
@@ -266,13 +281,14 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks);
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks);
checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "T8", null, locks);
- txnMgr.rollbackTxn();
+ driver.releaseLocksAndCommitOrRollback(false, txnMgr);
((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(2).getLockid());
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks);
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks);
- txnMgr2.commitTxn();
+ driver.releaseLocksAndCommitOrRollback(true, txnMgr2);
+ swapTxnManager(txnMgr);
cpr = driver.run("drop table if exists T6");
locks = getLocks();
Assert.assertEquals("Unexpected number of locks found", 0, locks.size());
@@ -284,21 +300,20 @@ public class TestDbTxnManager2 {
dropTable(new String[] {"T9"});
conf.setIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES, 2);
conf.setBoolVar(HiveConf.ConfVars.TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT, true);
- HiveTxnManager otherTxnMgr = new DbTxnManager();
- ((DbTxnManager)otherTxnMgr).setHiveConf(conf);
CommandProcessorResponse cpr = driver.run("create table T9(a int)");
checkCmdOnDriver(cpr);
cpr = driver.compileAndRespond("select * from T9");
checkCmdOnDriver(cpr);
txnMgr.acquireLocks(driver.getPlan(), ctx, "Vincent Vega");
- List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T9", null, locks);
-
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
cpr = driver.compileAndRespond("drop table T9");
checkCmdOnDriver(cpr);
try {
- otherTxnMgr.acquireLocks(driver.getPlan(), ctx, "Winston Winnfield");
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Winston Winnfield");
}
catch(LockException ex) {
Assert.assertEquals("Got wrong lock exception", ErrorMsg.LOCK_ACQUIRE_TIMEDOUT, ex.getCanonicalErrorMsg());
@@ -306,7 +321,7 @@ public class TestDbTxnManager2 {
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T9", null, locks);
- otherTxnMgr.closeTxnManager();
+ txnMgr2.closeTxnManager();
}
/**
@@ -324,13 +339,15 @@ public class TestDbTxnManager2 {
cpr = driver.compileAndRespond("select * from TAB_BLOCKED");
checkCmdOnDriver(cpr);
txnMgr.acquireLocks(driver.getPlan(), ctx, "I AM SAM");
- List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks);
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
cpr = driver.compileAndRespond("drop table TAB_BLOCKED");
checkCmdOnDriver(cpr);
- ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking
- locks = getLocks(txnMgr);
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "SAM I AM", false);//make non-blocking
+ locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_BLOCKED", null, locks);
checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "TAB_BLOCKED", null, locks);
@@ -597,9 +614,7 @@ public class TestDbTxnManager2 {
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", null, locks);
- List<HiveLock> relLocks = new ArrayList<HiveLock>(1);
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- txnMgr.getLockManager().releaseLocks(relLocks);
+ txnMgr.rollbackTxn();;
cpr = driver.compileAndRespond("insert into nonAcidPart partition(p=1) values(5,6)");
checkCmdOnDriver(cpr);
@@ -607,9 +622,7 @@ public class TestDbTxnManager2 {
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "nonAcidPart", "p=1", locks);
- relLocks = new ArrayList<HiveLock>(1);
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- txnMgr.getLockManager().releaseLocks(relLocks);
+ txnMgr.rollbackTxn();
cpr = driver.compileAndRespond("insert into acidPart partition(p) values(1,2,3)");
checkCmdOnDriver(cpr);
@@ -617,9 +630,7 @@ public class TestDbTxnManager2 {
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", null, locks);
- relLocks = new ArrayList<HiveLock>(1);
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- txnMgr.getLockManager().releaseLocks(relLocks);
+ txnMgr.rollbackTxn();
cpr = driver.compileAndRespond("insert into acidPart partition(p=1) values(5,6)");
checkCmdOnDriver(cpr);
@@ -627,19 +638,15 @@ public class TestDbTxnManager2 {
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "acidPart", "p=1", locks);
- relLocks = new ArrayList<HiveLock>(1);
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- txnMgr.getLockManager().releaseLocks(relLocks);
-
+ txnMgr.rollbackTxn();
+
cpr = driver.compileAndRespond("update acidPart set b = 17 where a = 1");
checkCmdOnDriver(cpr);
lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks);
- relLocks = new ArrayList<HiveLock>(1);
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- txnMgr.getLockManager().releaseLocks(relLocks);
+ txnMgr.rollbackTxn();
cpr = driver.compileAndRespond("update acidPart set b = 17 where p = 1");
checkCmdOnDriver(cpr);
@@ -647,9 +654,7 @@ public class TestDbTxnManager2 {
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "acidPart", null, locks);//https://issues.apache.org/jira/browse/HIVE-13212
- relLocks = new ArrayList<HiveLock>(1);
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- txnMgr.getLockManager().releaseLocks(relLocks);
+ txnMgr.rollbackTxn();
}
/**
* Check to make sure we acquire proper locks for queries involving acid and non-acid tables
@@ -726,6 +731,16 @@ public class TestDbTxnManager2 {
throw new IllegalStateException("How did it get here?!");
}
+ /**
+ * SessionState is stored in ThreadLoacal; UnitTest runs in a single thread (otherwise Derby wedges)
+ * {@link HiveTxnManager} instances are per SessionState.
+ * So to be able to simulate concurrent locks/transactions s/o forking threads we just swap
+ * the TxnManager instance in the session (hacky but nothing is actually threading so it allows us
+ * to write good tests)
+ */
+ private static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) {
+ return SessionState.get().setTxnMgr(txnMgr);
+ }
@Test
public void testShowLocksFilterOptions() throws Exception {
CommandProcessorResponse cpr = driver.run("drop table if exists db1.t14");
@@ -756,26 +771,32 @@ public class TestDbTxnManager2 {
// Acquire different locks at different levels
+ HiveTxnManager txnMgr1 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr1);
cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='today') values (1, 2)");
checkCmdOnDriver(cpr);
- txnMgr.acquireLocks(driver.getPlan(), ctx, "Tom");
+ txnMgr1.acquireLocks(driver.getPlan(), ctx, "Tom");
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
cpr = driver.compileAndRespond("insert into table db1.t14 partition (ds='tomorrow') values (3, 4)");
checkCmdOnDriver(cpr);
txnMgr2.acquireLocks(driver.getPlan(), ctx, "Jerry");
HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr3);
cpr = driver.compileAndRespond("select * from db2.t15");
checkCmdOnDriver(cpr);
txnMgr3.acquireLocks(driver.getPlan(), ctx, "Donald");
HiveTxnManager txnMgr4 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr4);
cpr = driver.compileAndRespond("select * from db2.t16");
checkCmdOnDriver(cpr);
txnMgr4.acquireLocks(driver.getPlan(), ctx, "Hillary");
HiveTxnManager txnMgr5 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr5);
cpr = driver.compileAndRespond("select * from db2.t14");
checkCmdOnDriver(cpr);
txnMgr5.acquireLocks(driver.getPlan(), ctx, "Obama");
@@ -799,6 +820,7 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "db2", "t14", null, locks);
// SHOW LOCKS t14
+ swapTxnManager(txnMgr);
cpr = driver.run("use db1");
checkCmdOnDriver(cpr);
locks = getLocksWithFilterOptions(txnMgr, null, "t14", null);
@@ -847,14 +869,13 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART"));
- txnMgr.openTxn(ctx, "Nicholas");
- checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas");
- HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
txnMgr.commitTxn();
- txnMgr2.openTxn(ctx, "Alexandra");
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
- txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Alexandra");
txnMgr2.commitTxn();
}
private void dropTable(String[] tabs) throws Exception {
@@ -899,16 +920,17 @@ public class TestDbTxnManager2 {
"partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)"));
- HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- long txnId = txnMgr.openTxn(ctx, "Known");
- long txnId2 = txnMgr2.openTxn(ctx, "Unknown");
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ long txnId = txnMgr.getCurrentTxnId();
txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
List<ShowLocksResponseElement> locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks);
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ long txnId2 = txnMgr2.getCurrentTxnId();
((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
Assert.assertEquals("Unexpected lock count", 2, locks.size());
@@ -933,7 +955,7 @@ public class TestDbTxnManager2 {
Assert.assertTrue("Didn't get exception", expectedException != null);
Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
Assert.assertEquals("Exception msg didn't match",
- "Aborting [txnid:3,3] due to a write conflict on default/TAB_PART/p=blah committed by [txnid:2,3] u/u",
+ "Aborting [txnid:"+txnId2+","+txnId2+"] due to a write conflict on default/TAB_PART/p=blah committed by [txnid:"+txnId+","+txnId2+"] u/u",
expectedException.getCause().getMessage());
}
/**
@@ -1067,7 +1089,7 @@ public class TestDbTxnManager2 {
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks);
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn(ctx, "Horton");
+ swapTxnManager(txnMgr2);
checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton");
Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
@@ -1080,6 +1102,7 @@ public class TestDbTxnManager2 {
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks);
+ txnMgr.commitTxn();
TestTxnCommands2.runHouseKeeperService(new AcidWriteSetService(), conf);
Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
}
@@ -1097,40 +1120,40 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.run("insert into tab2 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
-
+ swapTxnManager(txnMgr2);
//test with predicates such that partition pruning works
- txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'"));
+ long idTxnUpdate1 = txnMgr2.getCurrentTxnId();
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks);
//now start concurrent txn
- txnMgr.openTxn(ctx, "T3");
+ swapTxnManager(txnMgr);
checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'"));
+ long idTxnUpdate2 = txnMgr.getCurrentTxnId();
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks);
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks);
-
//this simulates the completion of txnid:2
+ //this simulates the completion of txnid:idTxnUpdate1
AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2",
Collections.singletonList("p=two"));
adp.setOperationType(DataOperationType.UPDATE);
txnHandler.addDynamicPartitions(adp);
- txnMgr2.commitTxn();//txnid:2
-
+ txnMgr2.commitTxn();//txnid:idTxnUpdate1
locks = getLocks(txnMgr2);
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks);
- //completion of txnid:3
+ //completion of txnid:idTxnUpdate2
adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2",
Collections.singletonList("p=one"));
adp.setOperationType(DataOperationType.UPDATE);
txnHandler.addDynamicPartitions(adp);
- txnMgr.commitTxn();//txnid:3
+ txnMgr.commitTxn();//txnid:idTxnUpdate2
//now both txns concurrently updated TAB2 but different partitions.
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
@@ -1147,8 +1170,9 @@ public class TestDbTxnManager2 {
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4
- txnMgr2.openTxn(ctx, "T5");
+ swapTxnManager(txnMgr2);
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ long idTxnUpdate3 = txnMgr2.getCurrentTxnId();
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5");
locks = getLocks(txnMgr2);
Assert.assertEquals("Unexpected lock count", 2, locks.size());
@@ -1156,8 +1180,9 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
//now start concurrent txn
- txnMgr.openTxn(ctx, "T6");
+ swapTxnManager(txnMgr);
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2"));
+ long idTxnUpdate4 = txnMgr.getCurrentTxnId();
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false);
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 4, locks.size());
@@ -1166,24 +1191,24 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks);
checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks);
- //this simulates the completion of txnid:5
+ //this simulates the completion of txnid:idTxnUpdate3
adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
Collections.singletonList("p=one"));
adp.setOperationType(DataOperationType.UPDATE);
txnHandler.addDynamicPartitions(adp);
- txnMgr2.commitTxn();//txnid:5
+ txnMgr2.commitTxn();//txnid:idTxnUpdate3
((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks);
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
- //completion of txnid:6
+ //completion of txnid:idTxnUpdate4
adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
Collections.singletonList("p=two"));
adp.setOperationType(DataOperationType.UPDATE);
txnHandler.addDynamicPartitions(adp);
- txnMgr.commitTxn();//txnid:6
+ txnMgr.commitTxn();//txnid:idTxnUpdate4
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
@@ -1202,10 +1227,11 @@ public class TestDbTxnManager2 {
CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
- checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn(ctx, "T2");
+ swapTxnManager(txnMgr2);
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ long idTxnUpdate1 = txnMgr2.getCurrentTxnId();
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
Assert.assertEquals("Unexpected lock count", 2, locks.size());
@@ -1213,8 +1239,9 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
//now start concurrent txn
- txnMgr.openTxn(ctx, "T3");
+ swapTxnManager(txnMgr);
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'"));
+ long idTxnUpdate2 = txnMgr.getCurrentTxnId();
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 3, locks.size());
@@ -1222,23 +1249,23 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks);
- //this simulates the completion of txnid:2
+ //this simulates the completion of txnid:idTxnUpdate1
AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
Collections.singletonList("p=one"));
adp.setOperationType(DataOperationType.UPDATE);
txnHandler.addDynamicPartitions(adp);
- txnMgr2.commitTxn();//txnid:2
+ txnMgr2.commitTxn();//txnid:idTxnUpdate1
((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks);
- //completion of txnid:3
+ //completion of txnid:idTxnUpdate2
adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
Collections.singletonList("p=two"));
adp.setOperationType(DataOperationType.UPDATE);
txnHandler.addDynamicPartitions(adp);
- txnMgr.commitTxn();//txnid:3
+ txnMgr.commitTxn();//txnid:idTxnUpdate2
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
@@ -1256,10 +1283,11 @@ public class TestDbTxnManager2 {
CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
- checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn(ctx, "T2");
+ swapTxnManager(txnMgr2);
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ long idTxnUpdate1 = txnMgr2.getCurrentTxnId();
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
Assert.assertEquals("Unexpected lock count", 2, locks.size());
@@ -1267,8 +1295,9 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
//now start concurrent txn
- txnMgr.openTxn(ctx, "T3");
+ swapTxnManager(txnMgr);
checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+ long idTxnDelete1 = txnMgr.getCurrentTxnId();
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 3, locks.size());
@@ -1276,30 +1305,30 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks);
- //this simulates the completion of txnid:2
+ //this simulates the completion of txnid:idTxnUpdate1
AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
Collections.singletonList("p=one"));
adp.setOperationType(DataOperationType.UPDATE);
txnHandler.addDynamicPartitions(adp);
- txnMgr2.commitTxn();//txnid:2
+ txnMgr2.commitTxn();//txnid:idTxnUpdate1
((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks);
- //completion of txnid:3
+ //completion of txnid:idTxnUpdate2
adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
Collections.singletonList("p=two"));
adp.setOperationType(DataOperationType.DELETE);
txnHandler.addDynamicPartitions(adp);
- txnMgr.commitTxn();//txnid:3
+ txnMgr.commitTxn();//txnid:idTxnUpdate2
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
- 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'"));
+ 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (idTxnUpdate1 - 1) + " and ctc_table='tab1'"));
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
- 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1' and ctc_partition='p=one'"));
+ 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + idTxnUpdate1 + " and ctc_table='tab1' and ctc_partition='p=one'"));
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
- 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=3 and ctc_table='tab1' and ctc_partition='p=two'"));
+ 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + idTxnDelete1 + " and ctc_table='tab1' and ctc_partition='p=two'"));
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
@@ -1318,7 +1347,7 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn(ctx, "T2");
+ swapTxnManager(txnMgr2);
checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
@@ -1327,7 +1356,7 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
//now start concurrent txn
- txnMgr.openTxn(ctx, "T3");
+ swapTxnManager(txnMgr);
checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
locks = getLocks(txnMgr);
@@ -1336,32 +1365,32 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks);
- //this simulates the completion of txnid:2
+ //this simulates the completion of "Update tab2" txn
AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
Collections.singletonList("p=two"));
adp.setOperationType(DataOperationType.UPDATE);
txnHandler.addDynamicPartitions(adp);
- txnMgr2.commitTxn();//txnid:2
+ txnMgr2.commitTxn();//"Update tab2"
((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks);
- //completion of txnid:3
+ //completion of "delete from tab1" txn
adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
Collections.singletonList("p=two"));
adp.setOperationType(DataOperationType.DELETE);
txnHandler.addDynamicPartitions(adp);
LockException exception = null;
try {
- txnMgr.commitTxn();//txnid:3
+ txnMgr.commitTxn();//"delete from tab1"
}
catch(LockException e) {
exception = e;
}
Assert.assertNotEquals("Expected exception", null, exception);
Assert.assertEquals("Exception msg doesn't match",
- "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3] d/u",
+ "Aborting [txnid:5,5] due to a write conflict on default/tab1/p=two committed by [txnid:4,5] d/u",
exception.getCause().getMessage());
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
@@ -1378,19 +1407,22 @@ public class TestDbTxnManager2 {
CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
- checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr2.openTxn(ctx, "T2");
- checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));
+ swapTxnManager(txnMgr2);
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));//start "delete from tab1" txn
+ long txnIdDelete = txnMgr2.getCurrentTxnId();
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks);
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
- //now start concurrent txn
- txnMgr.openTxn(ctx, "T3");
+ //now start concurrent "select * from tab1" txn
+ swapTxnManager(txnMgr);
+ checkCmdOnDriver(driver.run("start transaction"));//start explicit txn so that txnMgr knows it
checkCmdOnDriver(driver.compileAndRespond("select * from tab1 where b=1 and p='one'"));
+ long txnIdSelect = txnMgr.getCurrentTxnId();
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
@@ -1402,12 +1434,12 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks);
- //this simulates the completion of txnid:2
+ //this simulates the completion of "delete from tab1" txn
AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
Collections.singletonList("p=two"));
adp.setOperationType(DataOperationType.DELETE);
txnHandler.addDynamicPartitions(adp);
- txnMgr2.commitTxn();//txnid:2
+ txnMgr2.commitTxn();//"delete from tab1" txn
((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(4).getLockid());//retest WAITING locks (both have same ext id)
locks = getLocks(txnMgr);
@@ -1415,21 +1447,21 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", null, locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB1", "p=one", locks);
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks);
- //completion of txnid:3
+ //completion of txnid:txnIdSelect
adp = new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
Collections.singletonList("p=two"));
adp.setOperationType(DataOperationType.DELETE);
txnHandler.addDynamicPartitions(adp);
- txnMgr.commitTxn();//txnid:3
+ txnMgr.commitTxn();//"select * from tab1" txn
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
- 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2"));
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete));
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
- 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3"));
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect));
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
- 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=2"));
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdDelete));
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
- 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=3"));
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1' and ws_txnid=" + txnIdSelect));
Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
}
@@ -1448,7 +1480,7 @@ public class TestDbTxnManager2 {
1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS"));
//only expect transactional components to be in COMPLETED_TXN_COMPONENTS
Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
- 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=1 and ctc_table='tab1'"));
+ 1, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=6 and ctc_table='tab1'"));
}
/**
@@ -1465,17 +1497,17 @@ public class TestDbTxnManager2 {
cpr = driver.run("create table if not exists tab_not_acid (a int, b int, p string)");
checkCmdOnDriver(cpr);
checkCmdOnDriver(driver.run("insert into tab_not_acid values(1,1,'one'),(2,2,'two')"));
- checkCmdOnDriver(driver.run("insert into tab1 partition(p) values(3,3,'one'),(4,4,'two')"));//txinid:1
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p) values(3,3,'one'),(4,4,'two')"));//txinid:8
//writing both acid and non-acid resources in the same txn
//tab1 write is a dynamic partition insert
- checkCmdOnDriver(driver.run("from tab_not_acid insert into tab1 partition(p)(a,b,p) select a,b,p insert into tab_not_acid(a,b) select a,b where p='two'"));//txnid:2
+ checkCmdOnDriver(driver.run("from tab_not_acid insert into tab1 partition(p)(a,b,p) select a,b,p insert into tab_not_acid(a,b) select a,b where p='two'"));//txnid:9
Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS"));
//only expect transactional components to be in COMPLETED_TXN_COMPONENTS
Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
- 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2"));
+ 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=9"));
Assert.assertEquals(TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
- 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=2 and ctc_table='tab1'"));
+ 2, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=9 and ctc_table='tab1'"));
}
//todo: Concurrent insert/update of same partition - should pass
@@ -1546,13 +1578,13 @@ public class TestDbTxnManager2 {
"(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)"));
- long txnId1 = txnMgr.openTxn(ctx, "T1");
checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " +
"when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3
"when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
"when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1
+ long txnId1 = txnMgr.getCurrentTxnId();
txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
- List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 5, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
@@ -1562,13 +1594,14 @@ public class TestDbTxnManager2 {
//start concurrent txn
DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- long txnId2 = txnMgr2.openTxn(ctx, "T2");
+ swapTxnManager(txnMgr2);
checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " +
"when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2
"when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
"when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1
+ long txnId2 = txnMgr2.getCurrentTxnId();
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false);
- locks = getLocks(txnMgr2);
+ locks = getLocks();
Assert.assertEquals("Unexpected lock count", 10, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
@@ -1639,7 +1672,7 @@ public class TestDbTxnManager2 {
//re-check locks which were in Waiting state - should now be Acquired
((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId);
- locks = getLocks(txnMgr2);
+ locks = getLocks();
Assert.assertEquals("Unexpected lock count", 5, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source2", null, locks);
@@ -1693,9 +1726,9 @@ public class TestDbTxnManager2 {
}
if(cc) {
Assert.assertNotNull("didn't get exception", expectedException);
- Assert.assertEquals("Transaction manager has aborted the transaction txnid:3. Reason: " +
- "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=3 " +
- "committed by [txnid:2,3] u/u", expectedException.getMessage());
+ Assert.assertEquals("Transaction manager has aborted the transaction txnid:11. Reason: " +
+ "Aborting [txnid:11,11] due to a write conflict on default/target/p=1/q=3 " +
+ "committed by [txnid:10,11] u/u", expectedException.getMessage());
Assert.assertEquals(
"COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
@@ -1753,14 +1786,13 @@ public class TestDbTxnManager2 {
"stored as orc TBLPROPERTIES ('transactional'='true')"));
checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)"));
checkCmdOnDriver(driver.run("create table source (a int, b int)"));
-
- long txnid1 = txnMgr.openTxn(ctx, "T1");
if(causeConflict) {
checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1"));
}
else {
checkCmdOnDriver(driver.compileAndRespond("insert into target values(9,10),(11,12)"));
}
+ long txnid1 = txnMgr.getCurrentTxnId();
txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
Assert.assertEquals(
"TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " +
@@ -1774,14 +1806,15 @@ public class TestDbTxnManager2 {
LockState.ACQUIRED, "default", "target", null, locks);
DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
//start a 2nd (overlapping) txn
- long txnid2 = txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " +
"on t.a=s.a " +
"when matched then delete " +
"when not matched then insert values(s.a,s.b)"));
+ long txnid2 = txnMgr2.getCurrentTxnId();
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false);
- locks = getLocks(txnMgr);
+ locks = getLocks();
Assert.assertEquals("Unexpected lock count", 3, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks);
@@ -1801,7 +1834,7 @@ public class TestDbTxnManager2 {
//re-check locks which were in Waiting state - should now be Acquired
((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId);
- locks = getLocks(txnMgr);
+ locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks);
@@ -1830,7 +1863,7 @@ public class TestDbTxnManager2 {
Assert.assertTrue("Didn't get exception", expectedException != null);
Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
Assert.assertEquals("Exception msg didn't match",
- "Aborting [txnid:3,3] due to a write conflict on default/target committed by [txnid:2,3] d/u",
+ "Aborting [txnid:7,7] due to a write conflict on default/target committed by [txnid:6,7] d/u",
expectedException.getCause().getMessage());
}
else {
@@ -1922,21 +1955,22 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)"));
checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)"));
- long txnId1 = txnMgr.openTxn(ctx, "T1");
checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1"));
+ long txnId1 = txnMgr.getCurrentTxnId();
txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
- List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
//start a 2nd (overlapping) txn
- long txnid2 = txnMgr2.openTxn(ctx, "T2");
checkCmdOnDriver(driver.compileAndRespond("merge into target using source " +
"on target.p=source.p1 and target.a=source.a1 " +
"when matched then update set b = 11 " +
"when not matched then insert values(a1,b1,p1,q1)"));
+ long txnid2 = txnMgr2.getCurrentTxnId();
txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false);
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 7, locks.size());
@@ -1982,7 +2016,7 @@ public class TestDbTxnManager2 {
//re-check locks which were in Waiting state - should now be Acquired
((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId);
- locks = getLocks(txnMgr);
+ locks = getLocks();
Assert.assertEquals("Unexpected lock count", 5, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
@@ -2031,7 +2065,7 @@ public class TestDbTxnManager2 {
Assert.assertTrue("Didn't get exception", expectedException != null);
Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
Assert.assertEquals("Exception msg didn't match",
- "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=2 committed by [txnid:2,3] u/u",
+ "Aborting [txnid:7,7] due to a write conflict on default/target/p=1/q=2 committed by [txnid:6,7] u/u",
expectedException.getCause().getMessage());
}
else {
@@ -2046,6 +2080,12 @@ public class TestDbTxnManager2 {
TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2));
}
}
+
+ /**
+ * This test is mostly obsolete. The logic in the Driver.java no longer acquires any locks for
+ * "show tables". Keeping the test for now in case we change that logic.
+ * @throws Exception
+ */
@Test
public void testShowTablesLock() throws Exception {
dropTable(new String[] {"T, T2"});
@@ -2061,6 +2101,8 @@ public class TestDbTxnManager2 {
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t", null, locks);
DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn(ctx, "Fidler");
+ swapTxnManager(txnMgr2);
checkCmdOnDriver(driver.compileAndRespond("show tables"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fidler");
locks = getLocks();
@@ -2068,17 +2110,17 @@ public class TestDbTxnManager2 {
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t", null, locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", null, null, locks);
txnMgr.commitTxn();
- txnMgr2.releaseLocks(txnMgr2.getLockManager().getLocks(false, false));
+ txnMgr2.rollbackTxn();
Assert.assertEquals("Lock remained", 0, getLocks().size());
Assert.assertEquals("Lock remained", 0, getLocks(txnMgr2).size());
+ swapTxnManager(txnMgr);
cpr = driver.run(
"create table if not exists T2 (a int, b int) partitioned by (p int) clustered by (a) " +
"into 2 buckets stored as orc TBLPROPERTIES ('transactional'='false')");
checkCmdOnDriver(cpr);
- txnid1 = txnMgr.openTxn(ctx, "Fifer");
checkCmdOnDriver(driver.compileAndRespond("insert into T2 partition(p=1) values(1,3)"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
locks = getLocks();
@@ -2086,6 +2128,8 @@ public class TestDbTxnManager2 {
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t2", "p=1", locks);
txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn(ctx, "Fidler");
+ swapTxnManager(txnMgr2);
checkCmdOnDriver(driver.compileAndRespond("show tables"));
txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fidler", false);
locks = getLocks();
@@ -2093,7 +2137,7 @@ public class TestDbTxnManager2 {
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "t2", "p=1", locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", null, null, locks);
txnMgr.commitTxn();
- txnMgr2.releaseLocks(txnMgr2.getLockManager().getLocks(false, false));
+ txnMgr2.commitTxn();
Assert.assertEquals("Lock remained", 0, getLocks().size());
Assert.assertEquals("Lock remained", 0, getLocks(txnMgr2).size());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index f75a1be..ec3e6ab 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -29,6 +30,8 @@ import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
@@ -223,10 +226,10 @@ public class TestInitiator extends CompactorTest {
LockResponse res = txnHandler.lock(req);
txnHandler.abortTxn(new AbortTxnRequest(txnid));
- for (int i = 0; i < TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50; i++) {
- txnid = openTxn();
- txnHandler.abortTxn(new AbortTxnRequest(txnid));
- }
+ conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50);
+ OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(
+ TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50, "user", "hostname"));
+ txnHandler.abortTxns(new AbortTxnsRequest(resp.getTxn_ids()));
GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/queries/clientpositive/row__id.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/row__id.q b/ql/src/test/queries/clientpositive/row__id.q
index a24219b..d9cb7b0 100644
--- a/ql/src/test/queries/clientpositive/row__id.q
+++ b/ql/src/test/queries/clientpositive/row__id.q
@@ -16,7 +16,7 @@ select tid from (select row__id.transactionid as tid from hello_acid) sub order
select tid from (select row__id.transactionid as tid from hello_acid) sub order by tid;
explain
-select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1;
+select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3;
-select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1;
+select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3;
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/results/clientpositive/acid_table_stats.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/acid_table_stats.q.out b/ql/src/test/results/clientpositive/acid_table_stats.q.out
index 1bf0a98..195278a 100644
--- a/ql/src/test/results/clientpositive/acid_table_stats.q.out
+++ b/ql/src/test/results/clientpositive/acid_table_stats.q.out
@@ -98,7 +98,7 @@ Partition Parameters:
numFiles 2
numRows 0
rawDataSize 0
- totalSize 3837
+ totalSize 3852
#### A masked pattern was here ####
# Storage Information
@@ -136,9 +136,9 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: acid
- Statistics: Num rows: 1 Data size: 3837 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 3852 Basic stats: PARTIAL Column stats: NONE
Select Operator
- Statistics: Num rows: 1 Data size: 3837 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 3852 Basic stats: PARTIAL Column stats: NONE
Group By Operator
aggregations: count()
mode: hash
@@ -215,7 +215,7 @@ Partition Parameters:
numFiles 2
numRows 1000
rawDataSize 208000
- totalSize 3837
+ totalSize 3852
#### A masked pattern was here ####
# Storage Information
@@ -264,7 +264,7 @@ Partition Parameters:
numFiles 2
numRows 1000
rawDataSize 208000
- totalSize 3837
+ totalSize 3852
#### A masked pattern was here ####
# Storage Information
@@ -391,7 +391,7 @@ Partition Parameters:
numFiles 4
numRows 1000
rawDataSize 208000
- totalSize 7689
+ totalSize 7718
#### A masked pattern was here ####
# Storage Information
@@ -440,7 +440,7 @@ Partition Parameters:
numFiles 4
numRows 2000
rawDataSize 416000
- totalSize 7689
+ totalSize 7718
#### A masked pattern was here ####
# Storage Information
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
index 3ae2f20..101cc63 100644
--- a/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
+++ b/ql/src/test/results/clientpositive/autoColumnStats_4.q.out
@@ -201,7 +201,7 @@ Table Parameters:
numFiles 2
numRows 0
rawDataSize 0
- totalSize 1714
+ totalSize 1724
transactional true
#### A masked pattern was here ####
@@ -244,7 +244,7 @@ Table Parameters:
numFiles 4
numRows 0
rawDataSize 0
- totalSize 2719
+ totalSize 2763
transactional true
#### A masked pattern was here ####
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out b/ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out
index 24db44f..fa8417b 100644
--- a/ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out
+++ b/ql/src/test/results/clientpositive/insert_values_orig_table_use_metadata.q.out
@@ -308,7 +308,7 @@ Table Parameters:
numFiles 1
numRows 0
rawDataSize 0
- totalSize 1508
+ totalSize 1512
transactional true
#### A masked pattern was here ####
@@ -336,9 +336,9 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: acid_ivot
- Statistics: Num rows: 1 Data size: 1508 Basic stats: COMPLETE Column stats: COMPLETE
+ Statistics: Num rows: 1 Data size: 1512 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
- Statistics: Num rows: 1 Data size: 1508 Basic stats: COMPLETE Column stats: COMPLETE
+ Statistics: Num rows: 1 Data size: 1512 Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator
aggregations: count()
mode: hash
@@ -430,7 +430,7 @@ Table Parameters:
numFiles 2
numRows 0
rawDataSize 0
- totalSize 3016
+ totalSize 3024
transactional true
#### A masked pattern was here ####
@@ -458,9 +458,9 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: acid_ivot
- Statistics: Num rows: 1 Data size: 3016 Basic stats: COMPLETE Column stats: COMPLETE
+ Statistics: Num rows: 1 Data size: 3024 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
- Statistics: Num rows: 1 Data size: 3016 Basic stats: COMPLETE Column stats: COMPLETE
+ Statistics: Num rows: 1 Data size: 3024 Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator
aggregations: count()
mode: hash
@@ -538,7 +538,7 @@ Table Parameters:
numFiles 3
numRows 0
rawDataSize 0
- totalSize 380253
+ totalSize 380261
transactional true
#### A masked pattern was here ####
@@ -566,9 +566,9 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: acid_ivot
- Statistics: Num rows: 1 Data size: 380253 Basic stats: COMPLETE Column stats: COMPLETE
+ Statistics: Num rows: 1 Data size: 380261 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
- Statistics: Num rows: 1 Data size: 380253 Basic stats: COMPLETE Column stats: COMPLETE
+ Statistics: Num rows: 1 Data size: 380261 Basic stats: COMPLETE Column stats: COMPLETE
Group By Operator
aggregations: count()
mode: hash
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out b/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out
index d05bf64..357ae7b 100644
--- a/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out
+++ b/ql/src/test/results/clientpositive/llap/acid_bucket_pruning.q.out
@@ -43,7 +43,7 @@ STAGE PLANS:
alias: acidtbldefault
filterExpr: (a = 1) (type: boolean)
buckets included: [1,] of 16
- Statistics: Num rows: 7972 Data size: 31889 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 7972 Data size: 31888 Basic stats: COMPLETE Column stats: NONE
GatherStats: false
Filter Operator
isSamplingPred: false
@@ -100,7 +100,7 @@ STAGE PLANS:
serialization.ddl struct acidtbldefault { i32 a}
serialization.format 1
serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde
- totalSize 31889
+ totalSize 31888
transactional true
transactional_properties default
#### A masked pattern was here ####
@@ -123,7 +123,7 @@ STAGE PLANS:
serialization.ddl struct acidtbldefault { i32 a}
serialization.format 1
serialization.lib org.apache.hadoop.hive.ql.io.orc.OrcSerde
- totalSize 31889
+ totalSize 31888
transactional true
transactional_properties default
#### A masked pattern was here ####
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/results/clientpositive/row__id.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/row__id.q.out b/ql/src/test/results/clientpositive/row__id.q.out
index 2289883..43c9b60 100644
--- a/ql/src/test/results/clientpositive/row__id.q.out
+++ b/ql/src/test/results/clientpositive/row__id.q.out
@@ -56,23 +56,23 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: hello_acid
- Statistics: Num rows: 1 Data size: 2902 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE
Select Operator
expressions: ROW__ID.transactionid (type: bigint)
outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 2902 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: bigint)
sort order: +
- Statistics: Num rows: 1 Data size: 2902 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: bigint)
outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 2902 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 1 Data size: 2902 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -98,14 +98,14 @@ POSTHOOK: Input: default@hello_acid@load_date=2016-03-01
POSTHOOK: Input: default@hello_acid@load_date=2016-03-02
POSTHOOK: Input: default@hello_acid@load_date=2016-03-03
#### A masked pattern was here ####
-1
-2
3
+4
+5
PREHOOK: query: explain
-select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1
+select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3
PREHOOK: type: QUERY
POSTHOOK: query: explain
-select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1
+select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-1 is a root stage
@@ -117,17 +117,17 @@ STAGE PLANS:
Map Operator Tree:
TableScan
alias: hello_acid
- Statistics: Num rows: 1 Data size: 2902 Basic stats: PARTIAL Column stats: NONE
+ Statistics: Num rows: 1 Data size: 2936 Basic stats: PARTIAL Column stats: NONE
Filter Operator
- predicate: (ROW__ID.transactionid = 1) (type: boolean)
- Statistics: Num rows: 1 Data size: 2902 Basic stats: COMPLETE Column stats: NONE
+ predicate: (ROW__ID.transactionid = 3) (type: boolean)
+ Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: ROW__ID.transactionid (type: bigint)
outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 2902 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
- Statistics: Num rows: 1 Data size: 2902 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 1 Data size: 2936 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
@@ -139,18 +139,18 @@ STAGE PLANS:
Processor Tree:
ListSink
-PREHOOK: query: select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1
+PREHOOK: query: select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3
PREHOOK: type: QUERY
PREHOOK: Input: default@hello_acid
PREHOOK: Input: default@hello_acid@load_date=2016-03-01
PREHOOK: Input: default@hello_acid@load_date=2016-03-02
PREHOOK: Input: default@hello_acid@load_date=2016-03-03
#### A masked pattern was here ####
-POSTHOOK: query: select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 1
+POSTHOOK: query: select tid from (select row__id.transactionid as tid from hello_acid) sub where tid = 3
POSTHOOK: type: QUERY
POSTHOOK: Input: default@hello_acid
POSTHOOK: Input: default@hello_acid@load_date=2016-03-01
POSTHOOK: Input: default@hello_acid@load_date=2016-03-02
POSTHOOK: Input: default@hello_acid@load_date=2016-03-03
#### A masked pattern was here ####
-1
+3
[3/3] hive git commit: HIVE-12636 Ensure that all queries (with
DbTxnManager) run in a transaction (Eugene Koifman, reviewed by Wei Zheng)
Posted by ek...@apache.org.
HIVE-12636 Ensure that all queries (with DbTxnManager) run in a transaction (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/21909601
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/21909601
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/21909601
Branch: refs/heads/master
Commit: 21909601f8f5f9d8325774178aaaa8fb3c26a764
Parents: 41c3832
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Mon May 1 09:43:27 2017 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Mon May 1 09:43:27 2017 -0700
----------------------------------------------------------------------
.../hive/hcatalog/streaming/TestStreaming.java | 40 +--
.../hive/ql/txn/compactor/TestCompactor.java | 80 ++---
.../hadoop/hive/metastore/txn/TxnHandler.java | 13 +-
.../hadoop/hive/metastore/txn/TxnUtils.java | 4 +
.../java/org/apache/hadoop/hive/ql/Context.java | 7 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 204 ++++++-----
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 14 +-
.../org/apache/hadoop/hive/ql/QueryPlan.java | 28 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 282 +++++++++++++---
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 16 +-
.../hive/ql/lockmgr/HiveTxnManagerImpl.java | 20 +-
.../hive/ql/parse/ExplainSemanticAnalyzer.java | 6 +-
.../hive/ql/parse/SemanticAnalyzerFactory.java | 23 +-
.../hadoop/hive/ql/plan/HiveOperation.java | 37 +-
.../hadoop/hive/ql/session/SessionState.java | 15 +
.../hive/metastore/txn/TestTxnHandler.java | 2 +-
.../org/apache/hadoop/hive/ql/TestErrorMsg.java | 6 +-
.../apache/hadoop/hive/ql/TestTxnCommands.java | 48 +--
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 21 +-
.../ql/TestTxnCommands2WithSplitUpdate.java | 20 +-
.../hive/ql/lockmgr/TestDbTxnManager.java | 65 ++--
.../hive/ql/lockmgr/TestDbTxnManager2.java | 336 +++++++++++--------
.../hive/ql/txn/compactor/TestInitiator.java | 11 +-
ql/src/test/queries/clientpositive/row__id.q | 4 +-
.../clientpositive/acid_table_stats.q.out | 14 +-
.../clientpositive/autoColumnStats_4.q.out | 4 +-
.../insert_values_orig_table_use_metadata.q.out | 18 +-
.../llap/acid_bucket_pruning.q.out | 6 +-
.../test/results/clientpositive/row__id.q.out | 34 +-
29 files changed, 840 insertions(+), 538 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
----------------------------------------------------------------------
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 8ea58e6..097de9b 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -741,7 +741,7 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -753,11 +753,11 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -809,7 +809,7 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -821,11 +821,11 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -871,7 +871,7 @@ public class TestStreaming {
txnBatch.write(rec1.getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -998,7 +998,7 @@ public class TestStreaming {
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 14, 23, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -1017,13 +1017,13 @@ public class TestStreaming {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}");
txnBatch.beginNextTransaction();
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 24, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -1034,14 +1034,14 @@ public class TestStreaming {
txnBatch.write("3,Hello streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}", "{3, Hello streaming - once again}");
txnBatch.beginNextTransaction();
txnBatch.write("4,Welcome to streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 15, 34, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}", "{3, Hello streaming - once again}",
"{4, Welcome to streaming - once again}");
@@ -1078,11 +1078,11 @@ public class TestStreaming {
txnBatch2.commit();
- checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 24, 33, 1, 1, "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.beginNextTransaction();
txnBatch1.write("2,Welcome to streaming".getBytes());
@@ -1090,17 +1090,17 @@ public class TestStreaming {
txnBatch2.beginNextTransaction();
txnBatch2.write("4,Welcome to streaming - once again".getBytes());
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}");
txnBatch2.commit();
- checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 14, 33, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}",
"{4, Welcome to streaming - once again}");
@@ -1769,7 +1769,7 @@ public class TestStreaming {
txnBatch.heartbeat();//this is no-op on closed batch
txnBatch.abort();//ditto
GetOpenTxnsInfoResponse r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 2, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 17, r.getTxn_high_water_mark());
List<TxnInfo> ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1833,7 +1833,7 @@ public class TestStreaming {
expectedEx != null && expectedEx.getMessage().contains("has been closed()"));
r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 4, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 19, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(0)", TxnState.ABORTED, ti.get(0).getState());
Assert.assertEquals("wrong status ti(1)", TxnState.ABORTED, ti.get(1).getState());
@@ -1856,7 +1856,7 @@ public class TestStreaming {
expectedEx != null && expectedEx.getMessage().contains("Simulated fault occurred"));
r = msClient.showTxns();
- Assert.assertEquals("HWM didn't match", 6, r.getTxn_high_water_mark());
+ Assert.assertEquals("HWM didn't match", 21, r.getTxn_high_water_mark());
ti = r.getOpen_txns();
Assert.assertEquals("wrong status ti(3)", TxnState.ABORTED, ti.get(3).getState());
Assert.assertEquals("wrong status ti(4)", TxnState.ABORTED, ti.get(4).getState());
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 66ed8ca..f92db7c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -659,17 +659,17 @@ public class TestCompactor {
Path resultFile = null;
for (int i = 0; i < names.length; i++) {
names[i] = stat[i].getPath().getName();
- if (names[i].equals("delta_0000001_0000004")) {
+ if (names[i].equals("delta_0000003_0000006")) {
resultFile = stat[i].getPath();
}
}
Arrays.sort(names);
- String[] expected = new String[]{"delta_0000001_0000002",
- "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+ String[] expected = new String[]{"delta_0000003_0000004",
+ "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"};
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
} finally {
connection.close();
@@ -718,11 +718,11 @@ public class TestCompactor {
FileStatus[] stat =
fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
if (1 != stat.length) {
- Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+ Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
- Assert.assertEquals(name, "base_0000004");
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ Assert.assertEquals(name, "base_0000006");
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
} finally {
connection.close();
}
@@ -778,17 +778,17 @@ public class TestCompactor {
Path resultDelta = null;
for (int i = 0; i < names.length; i++) {
names[i] = stat[i].getPath().getName();
- if (names[i].equals("delta_0000001_0000004")) {
+ if (names[i].equals("delta_0000003_0000006")) {
resultDelta = stat[i].getPath();
}
}
Arrays.sort(names);
- String[] expected = new String[]{"delta_0000001_0000002",
- "delta_0000001_0000004", "delta_0000003_0000004"};
+ String[] expected = new String[]{"delta_0000003_0000004",
+ "delta_0000003_0000006", "delta_0000005_0000006"};
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
} finally {
connection.close();
}
@@ -844,13 +844,13 @@ public class TestCompactor {
Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat));
}
if (1 != stat.length) {
- Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+ Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
- if (!name.equals("base_0000004")) {
- Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004");
+ if (!name.equals("base_0000006")) {
+ Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006");
}
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
} finally {
connection.close();
}
@@ -899,11 +899,11 @@ public class TestCompactor {
FileStatus[] stat =
fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter);
if (1 != stat.length) {
- Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat));
+ Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
- Assert.assertEquals(name, "base_0000004");
- checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ Assert.assertEquals(name, "base_0000006");
+ checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
} finally {
connection.close();
}
@@ -923,18 +923,18 @@ public class TestCompactor {
" STORED AS ORC TBLPROPERTIES ('transactional'='true',"
+ "'transactional_properties'='default')", driver);
- // Insert some data -> this will generate only insert deltas and no delete deltas: delta_1_1
+ // Insert some data -> this will generate only insert deltas and no delete deltas: delta_3_3
executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(1, 'foo')", driver);
- // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_2_2
+ // Insert some data -> this will again generate only insert deltas and no delete deltas: delta_4_4
executeStatementOnDriver("INSERT INTO " + tblName +"(a,b) VALUES(2, 'bar')", driver);
- // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_3_3
+ // Delete some data -> this will generate only delete deltas and no insert deltas: delete_delta_5_5
executeStatementOnDriver("DELETE FROM " + tblName +" WHERE a = 2", driver);
// Now, compact -> Compaction produces a single range for both delta and delete delta
- // That is, both delta and delete_deltas would be compacted into delta_1_3 and delete_delta_1_3
- // even though there are only two delta_1_1, delta_2_2 and one delete_delta_3_3.
+ // That is, both delta and delete_deltas would be compacted into delta_3_5 and delete_delta_3_5
+ // even though there are only two delta_3_3, delta_4_4 and one delete_delta_5_5.
TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txnHandler.compact(new CompactionRequest(dbName, tblName, CompactionType.MINOR));
Worker t = new Worker();
@@ -957,16 +957,16 @@ public class TestCompactor {
Path minorCompactedDelta = null;
for (int i = 0; i < deltas.length; i++) {
deltas[i] = stat[i].getPath().getName();
- if (deltas[i].equals("delta_0000001_0000003")) {
+ if (deltas[i].equals("delta_0000003_0000005")) {
minorCompactedDelta = stat[i].getPath();
}
}
Arrays.sort(deltas);
- String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", "delta_0000002_0000002_0000"};
+ String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000005", "delta_0000004_0000004_0000"};
if (!Arrays.deepEquals(expectedDeltas, deltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
// Verify that we have got correct set of delete_deltas.
FileStatus[] deleteDeltaStat =
@@ -975,16 +975,16 @@ public class TestCompactor {
Path minorCompactedDeleteDelta = null;
for (int i = 0; i < deleteDeltas.length; i++) {
deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
- if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) {
+ if (deleteDeltas[i].equals("delete_delta_0000003_0000005")) {
minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
}
}
Arrays.sort(deleteDeltas);
- String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"};
+ String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000005", "delete_delta_0000005_0000005_0000"};
if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 2L, 2L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L);
}
@Test
@@ -1034,16 +1034,16 @@ public class TestCompactor {
Path minorCompactedDelta = null;
for (int i = 0; i < deltas.length; i++) {
deltas[i] = stat[i].getPath().getName();
- if (deltas[i].equals("delta_0000001_0000002")) {
+ if (deltas[i].equals("delta_0000003_0000004")) {
minorCompactedDelta = stat[i].getPath();
}
}
Arrays.sort(deltas);
- String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", "delta_0000002_0000002_0000"};
+ String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000004", "delta_0000004_0000004_0000"};
if (!Arrays.deepEquals(expectedDeltas, deltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas));
}
- checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 2L);
+ checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L);
// Verify that we have got correct set of delete_deltas.
FileStatus[] deleteDeltaStat =
@@ -1052,12 +1052,12 @@ public class TestCompactor {
Path minorCompactedDeleteDelta = null;
for (int i = 0; i < deleteDeltas.length; i++) {
deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
- if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) {
+ if (deleteDeltas[i].equals("delete_delta_0000003_0000004")) {
minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
}
}
Arrays.sort(deleteDeltas);
- String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"};
+ String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000004"};
if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
@@ -1111,17 +1111,17 @@ public class TestCompactor {
Path resultFile = null;
for (int i = 0; i < names.length; i++) {
names[i] = stat[i].getPath().getName();
- if (names[i].equals("delta_0000001_0000004")) {
+ if (names[i].equals("delta_0000003_0000006")) {
resultFile = stat[i].getPath();
}
}
Arrays.sort(names);
- String[] expected = new String[]{"delta_0000001_0000002",
- "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"};
+ String[] expected = new String[]{"delta_0000003_0000004",
+ "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"};
if (!Arrays.deepEquals(expected, names)) {
Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names));
}
- checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L);
+ checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L);
// Verify that we have got correct set of delete_deltas also
FileStatus[] deleteDeltaStat =
@@ -1130,12 +1130,12 @@ public class TestCompactor {
Path minorCompactedDeleteDelta = null;
for (int i = 0; i < deleteDeltas.length; i++) {
deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
- if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) {
+ if (deleteDeltas[i].equals("delete_delta_0000003_0000006")) {
minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
}
}
Arrays.sort(deleteDeltas);
- String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"};
+ String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000006"};
if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index e138838..12d98c5 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -855,7 +855,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
/**
* As much as possible (i.e. in absence of retries) we want both operations to be done on the same
* connection (but separate transactions). This avoid some flakiness in BONECP where if you
- * perform an operation on 1 connection and immediately get another fron the pool, the 2nd one
+ * perform an operation on 1 connection and immediately get another from the pool, the 2nd one
* doesn't see results of the first.
*
* Retry-by-caller note: If the call to lock is from a transaction, then in the worst case
@@ -994,6 +994,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
case SELECT:
updateTxnComponents = false;
break;
+ case NO_TXN:
+ /*this constant is a bit of a misnomer since we now always have a txn context. It
+ just means the operation is such that we don't care what tables/partitions it
+ affected as it doesn't trigger a compaction or conflict detection. A better name
+ would be NON_TRANSACTIONAL.*/
+ updateTxnComponents = false;
+ break;
default:
//since we have an open transaction, only 4 values above are expected
throw new IllegalStateException("Unexpected DataOperationType: " + lc.getOperationType()
@@ -2471,14 +2478,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
response.setLockid(extLockId);
LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
- Savepoint save = dbConn.setSavepoint();//todo: get rid of this
+ Savepoint save = dbConn.setSavepoint();
StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
"hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
"hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
Set<String> strings = new HashSet<String>(locksBeingChecked.size());
- //This the set of entities that the statement represnted by extLockId wants to update
+ //This the set of entities that the statement represented by extLockId wants to update
List<LockInfo> writeSet = new ArrayList<>();
for (LockInfo info : locksBeingChecked) {
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 517eec3..2df88fd 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -50,6 +50,10 @@ public class TxnUtils {
* @return a valid txn list.
*/
public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
+ /*todo: should highWater be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
+ * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which
+ * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should
+ * inlude the latest committed set.*/
long highWater = txns.getTxn_high_water_mark();
Set<Long> open = txns.getOpen_txns();
long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 08bba3d..fdcf052 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -948,6 +948,13 @@ public class Context {
public ExplainConfiguration getExplainConfig() {
return explainConfig;
}
+ private boolean isExplainPlan = false;
+ public boolean isExplainPlan() {
+ return isExplainPlan;
+ }
+ public void setExplainPlan(boolean t) {
+ this.isExplainPlan = t;
+ }
public void setExplainConfig(ExplainConfiguration explainConfig) {
this.explainConfig = explainConfig;
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 16b8101..d32f313 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -37,9 +37,12 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
@@ -154,11 +157,6 @@ public class Driver implements CommandProcessor {
private FetchTask fetchTask;
List<HiveLock> hiveLocks = new ArrayList<HiveLock>();
- // A list of FileSinkOperators writing in an ACID compliant manner
- private Set<FileSinkDesc> acidSinks;
- // whether any ACID table is involved in a query
- private boolean acidInQuery;
-
// A limit on the number of threads that can be launched
private int maxthreads;
private int tryCount = Integer.MAX_VALUE;
@@ -408,7 +406,7 @@ public class Driver implements CommandProcessor {
// deferClose indicates if the close/destroy should be deferred when the process has been
// interrupted, it should be set to true if the compile is called within another method like
// runInternal, which defers the close to the called in that method.
- public int compile(String command, boolean resetTaskIds, boolean deferClose) {
+ private int compile(String command, boolean resetTaskIds, boolean deferClose) {
PerfLogger perfLogger = SessionState.getPerfLogger(true);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_RUN);
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.COMPILE);
@@ -525,6 +523,15 @@ public class Driver implements CommandProcessor {
// because at that point we need access to the objects.
Hive.get().getMSC().flushCache();
+ if(checkConcurrency() && startImplicitTxn(txnManager)) {
+ String userFromUGI = getUserFromUGI();
+ if (!txnManager.isTxnOpen()) {
+ if(userFromUGI == null) {
+ return 10;
+ }
+ long txnid = txnManager.openTxn(ctx, userFromUGI);
+ }
+ }
// Do semantic analysis and plan generation
if (saHooks != null && !saHooks.isEmpty()) {
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
@@ -543,15 +550,10 @@ public class Driver implements CommandProcessor {
} else {
sem.analyze(tree, ctx);
}
- // Record any ACID compliant FileSinkOperators we saw so we can add our transaction ID to
- // them later.
- acidSinks = sem.getAcidFileSinks();
-
LOG.info("Semantic Analysis Completed");
// validate the plan
sem.validate();
- acidInQuery = sem.hasAcidInQuery();
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
if (isInterrupted()) {
@@ -669,7 +671,39 @@ public class Driver implements CommandProcessor {
}
}
-
+ private boolean startImplicitTxn(HiveTxnManager txnManager) throws LockException {
+ boolean shouldOpenImplicitTxn = !ctx.isExplainPlan();
+ //this is dumb. HiveOperation is not always set. see HIVE-16447/HIVE-16443
+ switch (queryState.getHiveOperation() == null ? HiveOperation.QUERY : queryState.getHiveOperation()) {
+ case COMMIT:
+ case ROLLBACK:
+ if(!txnManager.isTxnOpen()) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryState.getHiveOperation().getOperationName());
+ }
+ case SWITCHDATABASE:
+ case SET_AUTOCOMMIT:
+ /**
+ * autocommit is here for completeness. TM doesn't use it. If we want to support JDBC
+ * semantics (or any other definition of autocommit) it should be done at session level.
+ */
+ case SHOWDATABASES:
+ case SHOWTABLES:
+ case SHOWCOLUMNS:
+ case SHOWFUNCTIONS:
+ case SHOWINDEXES:
+ case SHOWPARTITIONS:
+ case SHOWLOCKS:
+ case SHOWVIEWS:
+ case SHOW_ROLES:
+ case SHOW_ROLE_PRINCIPALS:
+ case SHOW_COMPACTIONS:
+ case SHOW_TRANSACTIONS:
+ case ABORT_TRANSACTIONS:
+ shouldOpenImplicitTxn = false;
+ //this implies that no locks are needed for such a command
+ }
+ return shouldOpenImplicitTxn;
+ }
private int handleInterruption(String msg) {
return handleInterruptionWithHook(msg, null, null);
}
@@ -1083,8 +1117,17 @@ public class Driver implements CommandProcessor {
// Write the current set of valid transactions into the conf file so that it can be read by
// the input format.
private void recordValidTxns() throws LockException {
+ ValidTxnList oldList = null;
+ String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ if(s != null && s.length() > 0) {
+ oldList = new ValidReadTxnList(s);
+ }
HiveTxnManager txnMgr = SessionState.get().getTxnMgr();
ValidTxnList txns = txnMgr.getValidTxns();
+ if(oldList != null) {
+ throw new IllegalStateException("calling recordValidTxn() more than once in the same " +
+ JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
+ }
String txnStr = txns.toString();
conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
if(plan.getFetchTask() != null) {
@@ -1098,79 +1141,61 @@ public class Driver implements CommandProcessor {
LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId());
}
+ private String getUserFromUGI() {
+ // Don't use the userName member, as it may or may not have been set. Get the value from
+ // conf, which calls into getUGI to figure out who the process is running as.
+ try {
+ return conf.getUser();
+ } catch (IOException e) {
+ errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
+ SQLState = ErrorMsg.findSQLState(e.getMessage());
+ downstreamError = e;
+ console.printError(errorMessage,
+ "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ return null;
+ }
/**
* Acquire read and write locks needed by the statement. The list of objects to be locked are
- * obtained from the inputs and outputs populated by the compiler. The lock acquisition scheme is
- * pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making
- * sure that the locks are lexicographically sorted.
+ * obtained from the inputs and outputs populated by the compiler. Locking strategy depends on
+ * HiveTxnManager and HiveLockManager configured
*
* This method also records the list of valid transactions. This must be done after any
- * transactions have been opened and locks acquired.
- * @param startTxnImplicitly in AC=false, the 1st DML starts a txn
+ * transactions have been opened.
**/
- private int acquireLocksAndOpenTxn(boolean startTxnImplicitly) {
+ private int acquireLocks() {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
SessionState ss = SessionState.get();
HiveTxnManager txnMgr = ss.getTxnMgr();
- if(startTxnImplicitly) {
- assert !txnMgr.getAutoCommit();
+ if(!txnMgr.isTxnOpen() && txnMgr.supportsAcid()) {
+ /*non acid txn managers don't support txns but fwd lock requests to lock managers
+ acid txn manager requires all locks to be associated with a txn so if we
+ end up here w/o an open txn it's because we are processing something like "use <database>
+ which by definition needs no locks*/
+ return 0;
}
-
try {
- // Don't use the userName member, as it may or may not have been set. Get the value from
- // conf, which calls into getUGI to figure out who the process is running as.
- String userFromUGI;
- try {
- userFromUGI = conf.getUser();
- } catch (IOException e) {
- errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
- SQLState = ErrorMsg.findSQLState(e.getMessage());
- downstreamError = e;
- console.printError(errorMessage,
- "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ String userFromUGI = getUserFromUGI();
+ if(userFromUGI == null) {
return 10;
}
-
- boolean initiatingTransaction = false;
- boolean readOnlyQueryInAutoCommit = false;
- if((txnMgr.getAutoCommit() && haveAcidWrite()) || plan.getOperation() == HiveOperation.START_TRANSACTION ||
- (!txnMgr.getAutoCommit() && startTxnImplicitly)) {
- if(txnMgr.isTxnOpen()) {
- throw new RuntimeException("Already have an open transaction txnid:" + txnMgr.getCurrentTxnId());
- }
- // We are writing to tables in an ACID compliant way, so we need to open a transaction
- txnMgr.openTxn(ctx, userFromUGI);
- initiatingTransaction = true;
- }
- else {
- readOnlyQueryInAutoCommit = txnMgr.getAutoCommit() && plan.getOperation() == HiveOperation.QUERY && !haveAcidWrite();
- }
// Set the transaction id in all of the acid file sinks
if (haveAcidWrite()) {
- for (FileSinkDesc desc : acidSinks) {
+ for (FileSinkDesc desc : plan.getAcidSinks()) {
desc.setTransactionId(txnMgr.getCurrentTxnId());
//it's possible to have > 1 FileSink writing to the same table/partition
//e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes
desc.setStatementId(txnMgr.getWriteIdAndIncrement());
}
}
- /*Note, we have to record snapshot after lock acquisition to prevent lost update problem
- consider 2 concurrent "update table T set x = x + 1". 1st will get the locks and the
- 2nd will block until 1st one commits and only then lock in the snapshot, i.e. it will
- see the changes made by 1st one. This takes care of autoCommit=true case.
- For multi-stmt txns this is not sufficient and will be managed via WriteSet tracking
- in the lock manager.*/
+ /*It's imperative that {@code acquireLocks()} is called for all commands so that
+ HiveTxnManager can transition its state machine correctly*/
txnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
- if(initiatingTransaction || (readOnlyQueryInAutoCommit && acidInQuery)) {
- //For multi-stmt txns we should record the snapshot when txn starts but
- // don't update it after that until txn completes. Thus the check for {@code initiatingTransaction}
- //For autoCommit=true, Read-only statements, txn is implicit, i.e. lock in the snapshot
- //for each statement.
+ if(txnMgr.recordSnapshot(plan)) {
recordValidTxns();
}
-
return 0;
} catch (Exception e) {
errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
@@ -1185,7 +1210,7 @@ public class Driver implements CommandProcessor {
}
private boolean haveAcidWrite() {
- return acidSinks != null && !acidSinks.isEmpty();
+ return !plan.getAcidSinks().isEmpty();
}
/**
* @param commit if there is an open transaction and if true, commit,
@@ -1193,11 +1218,11 @@ public class Driver implements CommandProcessor {
* @param txnManager an optional existing transaction manager retrieved earlier from the session
*
**/
- private void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager)
+ @VisibleForTesting
+ public void releaseLocksAndCommitOrRollback(boolean commit, HiveTxnManager txnManager)
throws LockException {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
-
HiveTxnManager txnMgr;
if (txnManager == null) {
SessionState ss = SessionState.get();
@@ -1207,6 +1232,7 @@ public class Driver implements CommandProcessor {
}
// If we've opened a transaction we need to commit or rollback rather than explicitly
// releasing the locks.
+ conf.unset(ValidTxnList.VALID_TXNS_KEY);
if (txnMgr.isTxnOpen()) {
if (commit) {
if(conf.getBoolVar(ConfVars.HIVE_IN_TEST) && conf.getBoolVar(ConfVars.HIVETESTMODEROLLBACKTXN)) {
@@ -1492,52 +1518,12 @@ public class Driver implements CommandProcessor {
HiveTxnManager txnManager = SessionState.get().getTxnMgr();
ctx.setHiveTxnManager(txnManager);
- boolean startTxnImplicitly = false;
- {
- //this block ensures op makes sense in given context, e.g. COMMIT is valid only if txn is open
- //DDL is not allowed in a txn, etc.
- //an error in an open txn does a rollback of the txn
- if (txnManager.isTxnOpen() && !plan.getOperation().isAllowedInTransaction()) {
- assert !txnManager.getAutoCommit() : "didn't expect AC=true";
- return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, null,
- plan.getOperationName(), Long.toString(txnManager.getCurrentTxnId())));
- }
- if(!txnManager.isTxnOpen() && plan.getOperation().isRequiresOpenTransaction()) {
- return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, null, plan.getOperationName()));
- }
- if(!txnManager.isTxnOpen() && plan.getOperation() == HiveOperation.QUERY && !txnManager.getAutoCommit()) {
- //this effectively makes START TRANSACTION optional and supports JDBC setAutoCommit(false) semantics
- //also, indirectly allows DDL to be executed outside a txn context
- startTxnImplicitly = true;
- }
- if(txnManager.getAutoCommit() && plan.getOperation() == HiveOperation.START_TRANSACTION) {
- return rollback(new CommandProcessorResponse(12, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, null, plan.getOperationName()));
- }
- }
- if(plan.getOperation() == HiveOperation.SET_AUTOCOMMIT) {
- try {
- if(plan.getAutoCommitValue() && !txnManager.getAutoCommit()) {
- /*here, if there is an open txn, we want to commit it; this behavior matches
- * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)*/
- releaseLocksAndCommitOrRollback(true, null);
- txnManager.setAutoCommit(true);
- }
- else if(!plan.getAutoCommitValue() && txnManager.getAutoCommit()) {
- txnManager.setAutoCommit(false);
- }
- else {/*didn't change autoCommit value - no-op*/}
- }
- catch(LockException e) {
- return handleHiveException(e, 12);
- }
- }
-
if (requiresLock()) {
// a checkpoint to see if the thread is interrupted or not before an expensive operation
if (isInterrupted()) {
ret = handleInterruption("at acquiring the lock.");
} else {
- ret = acquireLocksAndOpenTxn(startTxnImplicitly);
+ ret = acquireLocks();
}
if (ret != 0) {
return rollback(createProcessorResponse(ret));
@@ -1551,7 +1537,8 @@ public class Driver implements CommandProcessor {
//if needRequireLock is false, the release here will do nothing because there is no lock
try {
- if(txnManager.getAutoCommit() || plan.getOperation() == HiveOperation.COMMIT) {
+ //since set autocommit starts an implicit txn, close it
+ if(txnManager.isImplicitTransactionOpen() || plan.getOperation() == HiveOperation.COMMIT) {
releaseLocksAndCommitOrRollback(true, null);
}
else if(plan.getOperation() == HiveOperation.ROLLBACK) {
@@ -1678,6 +1665,13 @@ public class Driver implements CommandProcessor {
private CommandProcessorResponse createProcessorResponse(int ret) {
SessionState.getPerfLogger().cleanupPerfLogMetrics();
queryDisplay.setErrorMessage(errorMessage);
+ if(downstreamError != null && downstreamError instanceof HiveException) {
+ ErrorMsg em = ((HiveException)downstreamError).getCanonicalErrorMsg();
+ if(em != null) {
+ return new CommandProcessorResponse(ret, errorMessage, SQLState,
+ schema, downstreamError, em.getErrorCode(), null);
+ }
+ }
return new CommandProcessorResponse(ret, errorMessage, SQLState, downstreamError);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 9667d71..d01a203 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -482,9 +482,17 @@ public enum ErrorMsg {
"is controlled by hive.exec.max.dynamic.partitions and hive.exec.max.dynamic.partitions.pernode. "),
PARTITION_SCAN_LIMIT_EXCEEDED(20005, "Number of partitions scanned (={0}) on table {1} exceeds limit" +
" (={2}). This is controlled by hive.limit.query.max.table.partition.", true),
- OP_NOT_ALLOWED_IN_AUTOCOMMIT(20006, "Operation {0} is not allowed when autoCommit=true.", true),//todo: better SQLState?
- OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction. TransactionID={1}.", true),
- OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed since autoCommit=false and there is no active transaction", true),
+ /**
+ * {1} is the transaction id;
+ * use {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} to format
+ */
+ OP_NOT_ALLOWED_IN_IMPLICIT_TXN(20006, "Operation {0} is not allowed in an implicit transaction ({1}).", true),
+ /**
+ * {1} is the transaction id;
+ * use {@link org.apache.hadoop.hive.common.JavaUtils#txnIdToString(long)} to format
+ */
+ OP_NOT_ALLOWED_IN_TXN(20007, "Operation {0} is not allowed in a transaction ({1},queryId={2}).", true),
+ OP_NOT_ALLOWED_WITHOUT_TXN(20008, "Operation {0} is not allowed without an active transaction", true),
//========================== 30000 range starts here ========================//
STATSPUBLISHER_NOT_OBTAINED(30000, "StatsPublisher cannot be obtained. " +
"There was a error to retrieve the StatsPublisher, and retrying " +
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index e8c8ae6..2ddabd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ColumnAccessInfo;
import org.apache.hadoop.hive.ql.parse.TableAccessInfo;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
@@ -105,11 +107,19 @@ public class QueryPlan implements Serializable {
private transient Long queryStartTime;
private final HiveOperation operation;
+ private final boolean acidResourcesInQuery;
+ private final Set<FileSinkDesc> acidSinks;
private Boolean autoCommitValue;
public QueryPlan() {
- this.reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
- operation = null;
+ this(null);
+ }
+ @VisibleForTesting
+ protected QueryPlan(HiveOperation command) {
+ this.reducerTimeStatsPerJobList = new ArrayList<>();
+ this.operation = command;
+ this.acidResourcesInQuery = false;
+ this.acidSinks = Collections.emptySet();
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
@@ -136,8 +146,22 @@ public class QueryPlan implements Serializable {
this.operation = operation;
this.autoCommitValue = sem.getAutoCommitValue();
this.resultSchema = resultSchema;
+ this.acidResourcesInQuery = sem.hasAcidInQuery();
+ this.acidSinks = sem.getAcidFileSinks();
}
+ /**
+ * @return true if any acid resources are read/written
+ */
+ public boolean hasAcidResourcesInQuery() {
+ return acidResourcesInQuery;
+ }
+ /**
+ * @return Collection of FileSinkDesc representing writes to Acid resources
+ */
+ Set<FileSinkDesc> getAcidSinks() {
+ return acidSinks;
+ }
public String getQueryStr() {
return queryString;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 62f7c5a..cdf2c40 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -21,6 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
+import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.LockTableDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
import org.apache.hive.common.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +62,13 @@ import java.util.concurrent.atomic.AtomicInteger;
* with a single thread accessing it at a time, with the exception of {@link #heartbeat()} method.
* The later may (usually will) be called from a timer thread.
* See {@link #getMS()} for more important concurrency/metastore access notes.
+ *
+ * Each statement that the TM (transaction manager) should be aware of should belong to a transaction.
+ * Effectively, that means any statement that has side effects. Exceptions are statements like
+ * Show Compactions, Show Tables, Use Database foo, etc. The transaction is started either
+ * explicitly ( via Start Transaction SQL statement from end user - not fully supported) or
+ * implicitly by the {@link org.apache.hadoop.hive.ql.Driver} (which looks exactly as autoCommit=true
+ * from end user poit of view). See more at {@link #isExplicitTransaction}.
*/
public final class DbTxnManager extends HiveTxnManagerImpl {
@@ -76,7 +88,47 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
* to keep apart multiple writes of the same data within the same transaction
* Also see {@link org.apache.hadoop.hive.ql.io.AcidOutputFormat.Options}
*/
- private int statementId = -1;
+ private int writeId = -1;
+ /**
+ * counts number of statements in the current transaction
+ */
+ private int numStatements = 0;
+ /**
+ * if {@code true} it means current transaction is started via START TRANSACTION which means it cannot
+ * include any Operations which cannot be rolled back (drop partition; write to non-acid table).
+ * If false, it's a single statement transaction which can include any statement. This is not a
+ * contradiction from the user point of view who doesn't know anything about the implicit txn
+ * and cannot call rollback (the statement of course can fail in which case there is nothing to
+ * rollback (assuming the statement is well implemented)).
+ *
+ * This is done so that all commands run in a transaction which simplifies implementation and
+ * allows a simple implementation of multi-statement txns which don't require a lock manager
+ * capable of deadlock detection. (todo: not fully implemented; elaborate on how this LM works)
+ *
+ * Also, critically important, ensuring that everything runs in a transaction assigns an order
+ * to all operations in the system - needed for replication/DR.
+ *
+ * We don't want to allow non-transactional statements in a user demarcated txn because the effect
+ * of such statement is "visible" immediately on statement completion, but the user may
+ * issue a rollback but the action of the statement can't be undone (and has possibly already been
+ * seen by another txn). For example,
+ * start transaction
+ * insert into transactional_table values(1);
+ * insert into non_transactional_table select * from transactional_table;
+ * rollback
+ *
+ * The user would be in for a surprise especially if they are not aware of transactional
+ * properties of the tables involved.
+ *
+ * As a side note: what should the lock manager do with locks for non-transactional resources?
+ * Should it it release them at the end of the stmt or txn?
+ * Some interesting thoughts: http://mysqlmusings.blogspot.com/2009/02/mixing-engines-in-transactions.html
+ */
+ private boolean isExplicitTransaction = false;
+ /**
+ * To ensure transactions don't nest.
+ */
+ private int startTransactionCount = 0;
// QueryId for the query in current transaction
private String queryId;
@@ -141,15 +193,22 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
@VisibleForTesting
long openTxn(Context ctx, String user, long delay) throws LockException {
- //todo: why don't we lock the snapshot here??? Instead of having client make an explicit call
- //whenever it chooses
+ /*Q: why don't we lock the snapshot here??? Instead of having client make an explicit call
+ whenever it chooses
+ A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock
+ acquisition. Relying on locks is a pessimistic strategy which works better under high
+ contention.*/
init();
+ getLockManager();
if(isTxnOpen()) {
throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
}
try {
txnId = getMS().openTxn(user);
- statementId = 0;
+ writeId = 0;
+ numStatements = 0;
+ isExplicitTransaction = false;
+ startTransactionCount = 0;
LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
ctx.setHeartbeater(startHeartbeat(delay));
return txnId;
@@ -159,8 +218,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
/**
- * we don't expect multiple thread to call this method concurrently but {@link #lockMgr} will
- * be read by a different threads that one writing it, thus it's {@code volatile}
+ * we don't expect multiple threads to call this method concurrently but {@link #lockMgr} will
+ * be read by a different threads than one writing it, thus it's {@code volatile}
*/
@Override
public HiveLockManager getLockManager() throws LockException {
@@ -179,24 +238,95 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
catch(LockException e) {
if(e.getCause() instanceof TxnAbortedException) {
txnId = 0;
- statementId = -1;
+ writeId = -1;
}
throw e;
}
}
/**
- * This is for testing only. Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
+ * Watermark to include in error msgs and logs
+ * @param queryPlan
+ * @return
+ */
+ private static String getQueryIdWaterMark(QueryPlan queryPlan) {
+ return "queryId=" + queryPlan.getQueryId();
+ }
+
+ private void markExplicitTransaction(QueryPlan queryPlan) throws LockException {
+ isExplicitTransaction = true;
+ if(++startTransactionCount > 1) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
+ JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
+ }
+
+ }
+ /**
+ * Ensures that the current SQL statement is appropriate for the current state of the
+ * Transaction Manager (e.g. can call commit unless you called start transaction)
+ *
+ * Note that support for multi-statement txns is a work-in-progress so it's only supported in
+ * HiveConf#HIVE_IN_TEST/HiveConf#TEZ_HIVE_IN_TEST.
+ * @param queryPlan
+ * @throws LockException
+ */
+ private void verifyState(QueryPlan queryPlan) throws LockException {
+ if(!isTxnOpen()) {
+ throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() +
+ " for " + getQueryIdWaterMark(queryPlan));
+ }
+ if(queryPlan.getOperation() == null) {
+ throw new IllegalStateException("Unkown HiverOperation for " + getQueryIdWaterMark(queryPlan));
+ }
+ numStatements++;
+ switch (queryPlan.getOperation()) {
+ case START_TRANSACTION:
+ markExplicitTransaction(queryPlan);
+ break;
+ case COMMIT:
+ case ROLLBACK:
+ if(!isTxnOpen()) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryPlan.getOperationName());
+ }
+ if(!isExplicitTransaction) {
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, queryPlan.getOperationName());
+ }
+ break;
+ default:
+ if(!queryPlan.getOperation().isAllowedInTransaction() && isExplicitTransaction) {
+ //for example, drop table in an explicit txn is not allowed
+ //in some cases this requires looking at more than just the operation
+ //for example HiveOperation.LOAD - OK if target is MM table but not OK if non-acid table
+ throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(),
+ JavaUtils.txnIdToString(getCurrentTxnId()), queryPlan.getQueryId());
+ }
+ }
+ /*
+ Should we allow writing to non-transactional tables in an explicit transaction? The user may
+ issue ROLLBACK but these tables won't rollback.
+ Can do this by checking ReadEntity/WriteEntity to determine whether it's reading/writing
+ any non acid and raise an appropriate error
+ * Driver.acidSinks and Driver.acidInQuery can be used if any acid is in the query*/
+ }
+ /**
+ * Normally client should call {@link #acquireLocks(org.apache.hadoop.hive.ql.QueryPlan, org.apache.hadoop.hive.ql.Context, String)}
* @param isBlocking if false, the method will return immediately; thus the locks may be in LockState.WAITING
* @return null if no locks were needed
*/
+ @VisibleForTesting
LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException {
init();
- // Make sure we've built the lock manager
+ // Make sure we've built the lock manager
getLockManager();
-
+ verifyState(plan);
boolean atLeastOneLock = false;
queryId = plan.getQueryId();
+ switch (plan.getOperation()) {
+ case SET_AUTOCOMMIT:
+ /**This is here for documentation purposes. This TM doesn't support this - only has one
+ * mode of operation documented at {@link DbTxnManager#isExplicitTransaction}*/
+ return null;
+ }
LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId);
//link queryId to txnId
@@ -240,8 +370,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
// This is a file or something we don't hold locks for.
continue;
}
- if(t != null && AcidUtils.isAcidTable(t)) {
- compBuilder.setIsAcid(true);
+ if(t != null) {
+ compBuilder.setIsAcid(AcidUtils.isAcidTable(t));
}
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -262,7 +392,33 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
LockComponentBuilder compBuilder = new LockComponentBuilder();
Table t = null;
+ switch (output.getType()) {
+ case DATABASE:
+ compBuilder.setDbName(output.getDatabase().getName());
+ break;
+
+ case TABLE:
+ case DUMMYPARTITION: // in case of dynamic partitioning lock the table
+ t = output.getTable();
+ compBuilder.setDbName(t.getDbName());
+ compBuilder.setTableName(t.getTableName());
+ break;
+
+ case PARTITION:
+ compBuilder.setPartitionName(output.getPartition().getName());
+ t = output.getPartition().getTable();
+ compBuilder.setDbName(t.getDbName());
+ compBuilder.setTableName(t.getTableName());
+ break;
+
+ default:
+ // This is a file or something we don't hold locks for.
+ continue;
+ }
switch (output.getWriteType()) {
+ /* base this on HiveOperation instead? this and DDL_NO_LOCK is peppered all over the code...
+ Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think
+ makes sense everywhere). This however would be problematic for merge...*/
case DDL_EXCLUSIVE:
case INSERT_OVERWRITE:
compBuilder.setExclusive();
@@ -270,10 +426,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
break;
case INSERT:
- t = getTable(output);
+ assert t != null;
if(AcidUtils.isAcidTable(t)) {
compBuilder.setShared();
- compBuilder.setIsAcid(true);
}
else {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
@@ -281,7 +436,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
} else { // this is backward compatible for non-ACID resources, w/o ACID semantics
compBuilder.setShared();
}
- compBuilder.setIsAcid(false);
}
compBuilder.setOperationType(DataOperationType.INSERT);
break;
@@ -293,12 +447,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
case UPDATE:
compBuilder.setSemiShared();
compBuilder.setOperationType(DataOperationType.UPDATE);
- t = getTable(output);
break;
case DELETE:
compBuilder.setSemiShared();
compBuilder.setOperationType(DataOperationType.DELETE);
- t = getTable(output);
break;
case DDL_NO_LOCK:
@@ -307,34 +459,11 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
default:
throw new RuntimeException("Unknown write type " +
output.getWriteType().toString());
-
}
- switch (output.getType()) {
- case DATABASE:
- compBuilder.setDbName(output.getDatabase().getName());
- break;
-
- case TABLE:
- case DUMMYPARTITION: // in case of dynamic partitioning lock the table
- t = output.getTable();
- compBuilder.setDbName(t.getDbName());
- compBuilder.setTableName(t.getTableName());
- break;
-
- case PARTITION:
- compBuilder.setPartitionName(output.getPartition().getName());
- t = output.getPartition().getTable();
- compBuilder.setDbName(t.getDbName());
- compBuilder.setTableName(t.getTableName());
- break;
-
- default:
- // This is a file or something we don't hold locks for.
- continue;
- }
- if(t != null && AcidUtils.isAcidTable(t)) {
- compBuilder.setIsAcid(true);
+ if(t != null) {
+ compBuilder.setIsAcid(AcidUtils.isAcidTable(t));
}
+
compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
@@ -405,7 +534,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
e);
} finally {
txnId = 0;
- statementId = -1;
+ writeId = -1;
+ numStatements = 0;
}
}
@@ -429,7 +559,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
e);
} finally {
txnId = 0;
- statementId = -1;
+ writeId = -1;
+ numStatements = 0;
}
}
@@ -556,6 +687,26 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
public boolean supportsExplicitLock() {
return false;
}
+ @Override
+ public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
+ super.lockTable(db, lockTbl);
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public int unlockTable(Hive hiveDB, UnlockTableDesc unlockTbl) throws HiveException {
+ super.unlockTable(hiveDB, unlockTbl);
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public int lockDatabase(Hive hiveDB, LockDatabaseDesc lockDb) throws HiveException {
+ super.lockDatabase(hiveDB, lockDb);
+ throw new UnsupportedOperationException();
+ }
+ @Override
+ public int unlockDatabase(Hive hiveDB, UnlockDatabaseDesc unlockDb) throws HiveException {
+ super.unlockDatabase(hiveDB, unlockDb);
+ throw new UnsupportedOperationException();
+ }
@Override
public boolean useNewShowLocksFormat() {
@@ -566,7 +717,44 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
public boolean supportsAcid() {
return true;
}
-
+ /**
+ * In an explicit txn start_transaction is the 1st statement and we record the snapshot at the
+ * start of the txn for Snapshot Isolation. For Read Committed (not supported yet) we'd record
+ * it before executing each statement (but after lock acquisition if using lock based concurrency
+ * control).
+ * For implicit txn, the stmt that triggered/started the txn is the first statement
+ */
+ @Override
+ public boolean recordSnapshot(QueryPlan queryPlan) {
+ assert isTxnOpen();
+ assert numStatements > 0 : "was acquireLocks() called already?";
+ if(queryPlan.getOperation() == HiveOperation.START_TRANSACTION) {
+ //here if start of explicit txn
+ assert isExplicitTransaction;
+ assert numStatements == 1;
+ return true;
+ }
+ else if(!isExplicitTransaction) {
+ assert numStatements == 1 : "numStatements=" + numStatements + " in implicit txn";
+ if (queryPlan.hasAcidResourcesInQuery()) {
+ //1st and only stmt in implicit txn and uses acid resource
+ return true;
+ }
+ }
+ return false;
+ }
+ @Override
+ public boolean isImplicitTransactionOpen() {
+ if(!isTxnOpen()) {
+ //some commands like "show databases" don't start implicit transactions
+ return false;
+ }
+ if(!isExplicitTransaction) {
+ assert numStatements == 1 : "numStatements=" + numStatements;
+ return true;
+ }
+ return false;
+ }
@Override
protected void destruct() {
try {
@@ -626,7 +814,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
@Override
public int getWriteIdAndIncrement() {
assert isTxnOpen();
- return statementId++;
+ return writeId++;
}
private static long getHeartbeatInterval(Configuration conf) throws LockException {
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 187a658..b24351c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -72,7 +72,7 @@ public interface HiveTxnManager {
/**
* Acquire all of the locks needed by a query. If used with a query that
- * requires transactions, this should be called after {@link #openTxn(String)}.
+ * requires transactions, this should be called after {@link #openTxn(Context, String)}.
* A list of acquired locks will be stored in the
* {@link org.apache.hadoop.hive.ql.Context} object and can be retrieved
* via {@link org.apache.hadoop.hive.ql.Context#getHiveLocks}.
@@ -208,17 +208,13 @@ public interface HiveTxnManager {
boolean supportsAcid();
/**
- * This behaves exactly as
- * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#setAutoCommit(boolean)
- */
- void setAutoCommit(boolean autoCommit) throws LockException;
-
- /**
- * This behaves exactly as
- * https://docs.oracle.com/javase/6/docs/api/java/sql/Connection.html#getAutoCommit()
+ * For resources that support MVCC, the state of the DB must be recorded for the duration of the
+ * operation/transaction. Returns {@code true} if current statment needs to do this.
*/
- boolean getAutoCommit();
+ boolean recordSnapshot(QueryPlan queryPlan);
+ boolean isImplicitTransactionOpen();
+
boolean isTxnOpen();
/**
* if {@code isTxnOpen()}, returns the currently active transaction ID
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index 9fa416c..8dbbf87 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
abstract class HiveTxnManagerImpl implements HiveTxnManager {
protected HiveConf conf;
- private boolean isAutoCommit = true;//true by default; matches JDBC spec
void setHiveConf(HiveConf c) {
conf = c;
@@ -68,16 +67,6 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
destruct();
}
@Override
- public void setAutoCommit(boolean autoCommit) throws LockException {
- isAutoCommit = autoCommit;
- }
-
- @Override
- public boolean getAutoCommit() {
- return isAutoCommit;
- }
-
- @Override
public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
HiveLockManager lockMgr = getAndCheckLockManager();
@@ -203,4 +192,13 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager {
return lockMgr;
}
+ @Override
+ public boolean recordSnapshot(QueryPlan queryPlan) {
+ return false;
+ }
+ @Override
+ public boolean isImplicitTransactionOpen() {
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
index f62cf9a..668783a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
@@ -125,6 +125,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
}
ctx.setExplainConfig(config);
+ ctx.setExplainPlan(true);
ASTNode input = (ASTNode) ast.getChild(0);
// explain analyze is composed of two steps
@@ -137,7 +138,7 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
Context runCtx = null;
try {
runCtx = new Context(conf);
- // runCtx and ctx share the configuration
+ // runCtx and ctx share the configuration, but not isExplainPlan()
runCtx.setExplainConfig(config);
Driver driver = new Driver(conf, runCtx);
CommandProcessorResponse ret = driver.run(query);
@@ -161,6 +162,9 @@ public class ExplainSemanticAnalyzer extends BaseSemanticAnalyzer {
LOG.info("Explain analyze (analyzing phase) for query " + query);
config.setAnalyze(AnalyzeState.ANALYZING);
}
+ //Creating new QueryState unfortunately causes all .q.out to change - do this in a separate ticket
+ //Sharing QueryState between generating the plan and executing the query seems bad
+ //BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(new QueryState(queryState.getConf()), input);
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, input);
sem.analyze(input, ctx);
sem.validate();
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 520d3de..3c60e03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -21,9 +21,9 @@ package org.apache.hadoop.hive.ql.parse;
import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
-import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
@@ -32,6 +32,7 @@ import java.util.HashMap;
*
*/
public final class SemanticAnalyzerFactory {
+ static final private Logger LOG = LoggerFactory.getLogger(SemanticAnalyzerFactory.class);
static HashMap<Integer, HiveOperation> commandType = new HashMap<Integer, HiveOperation>();
static HashMap<Integer, HiveOperation[]> tablePartitionCommandType = new HashMap<Integer, HiveOperation[]>();
@@ -131,7 +132,6 @@ public final class SemanticAnalyzerFactory {
commandType.put(HiveParser.TOK_REPL_DUMP, HiveOperation.EXPORT); // piggyback on EXPORT security handling for now
commandType.put(HiveParser.TOK_REPL_LOAD, HiveOperation.IMPORT); // piggyback on IMPORT security handling for now
commandType.put(HiveParser.TOK_REPL_STATUS, HiveOperation.SHOW_TBLPROPERTIES); // TODO : also actually DESCDATABASE
-
}
static {
@@ -171,7 +171,22 @@ public final class SemanticAnalyzerFactory {
HiveOperation.ALTERTABLE_UPDATEPARTSTATS});
}
- public static BaseSemanticAnalyzer get(QueryState queryState, ASTNode tree)
+
+ public static BaseSemanticAnalyzer get(QueryState queryState, ASTNode tree) throws SemanticException {
+ BaseSemanticAnalyzer sem = getInternal(queryState, tree);
+ if(queryState.getHiveOperation() == null) {
+ String query = queryState.getQueryString();
+ if(query != null && query.length() > 30) {
+ query = query.substring(0, 30);
+ }
+ String msg = "Unknown HiveOperation for query='" + query + "' queryId=" + queryState.getQueryId();
+ //throw new IllegalStateException(msg);
+ LOG.debug(msg);
+ }
+ return sem;
+ }
+
+ private static BaseSemanticAnalyzer getInternal(QueryState queryState, ASTNode tree)
throws SemanticException {
if (tree.getToken() == null) {
throw new RuntimeException("Empty Syntax Tree");
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
index d333f91..ecac31f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java
@@ -27,7 +27,7 @@ public enum HiveOperation {
IMPORT("IMPORT", null, new Privilege[]{Privilege.ALTER_METADATA, Privilege.ALTER_DATA}),
CREATEDATABASE("CREATEDATABASE", null, new Privilege[]{Privilege.CREATE}),
DROPDATABASE("DROPDATABASE", null, new Privilege[]{Privilege.DROP}),
- SWITCHDATABASE("SWITCHDATABASE", null, null),
+ SWITCHDATABASE("SWITCHDATABASE", null, null, true, false),
LOCKDB("LOCKDATABASE", new Privilege[]{Privilege.LOCK}, null),
UNLOCKDB("UNLOCKDATABASE", new Privilege[]{Privilege.LOCK}, null),
DROPTABLE ("DROPTABLE", null, new Privilege[]{Privilege.DROP}),
@@ -60,19 +60,19 @@ public enum HiveOperation {
new Privilege[]{Privilege.ALTER_METADATA}, null),
ALTERPARTITION_BUCKETNUM("ALTERPARTITION_BUCKETNUM",
new Privilege[]{Privilege.ALTER_METADATA}, null),
- SHOWDATABASES("SHOWDATABASES", new Privilege[]{Privilege.SHOW_DATABASE}, null),
- SHOWTABLES("SHOWTABLES", null, null),
- SHOWCOLUMNS("SHOWCOLUMNS", null, null),
- SHOW_TABLESTATUS("SHOW_TABLESTATUS", null, null),
- SHOW_TBLPROPERTIES("SHOW_TBLPROPERTIES", null, null),
+ SHOWDATABASES("SHOWDATABASES", new Privilege[]{Privilege.SHOW_DATABASE}, null, true, false),
+ SHOWTABLES("SHOWTABLES", null, null, true, false),
+ SHOWCOLUMNS("SHOWCOLUMNS", null, null, true, false),
+ SHOW_TABLESTATUS("SHOW_TABLESTATUS", null, null, true, false),
+ SHOW_TBLPROPERTIES("SHOW_TBLPROPERTIES", null, null, true, false),
SHOW_CREATEDATABASE("SHOW_CREATEDATABASE", new Privilege[]{Privilege.SELECT}, null),
SHOW_CREATETABLE("SHOW_CREATETABLE", new Privilege[]{Privilege.SELECT}, null),
- SHOWFUNCTIONS("SHOWFUNCTIONS", null, null),
- SHOWINDEXES("SHOWINDEXES", null, null),
+ SHOWFUNCTIONS("SHOWFUNCTIONS", null, null, true, false),
+ SHOWINDEXES("SHOWINDEXES", null, null, true, false),
SHOWPARTITIONS("SHOWPARTITIONS", null, null),
- SHOWLOCKS("SHOWLOCKS", null, null),
+ SHOWLOCKS("SHOWLOCKS", null, null, true, false),
SHOWCONF("SHOWCONF", null, null),
- SHOWVIEWS("SHOWVIEWS", null, null),
+ SHOWVIEWS("SHOWVIEWS", null, null, true, false),
CREATEFUNCTION("CREATEFUNCTION", null, null),
DROPFUNCTION("DROPFUNCTION", null, null),
RELOADFUNCTION("RELOADFUNCTION", null, null),
@@ -94,12 +94,12 @@ public enum HiveOperation {
DROPROLE("DROPROLE", null, null),
GRANT_PRIVILEGE("GRANT_PRIVILEGE", null, null),
REVOKE_PRIVILEGE("REVOKE_PRIVILEGE", null, null),
- SHOW_GRANT("SHOW_GRANT", null, null),
+ SHOW_GRANT("SHOW_GRANT", null, null, true, false),
GRANT_ROLE("GRANT_ROLE", null, null),
REVOKE_ROLE("REVOKE_ROLE", null, null),
- SHOW_ROLES("SHOW_ROLES", null, null),
- SHOW_ROLE_PRINCIPALS("SHOW_ROLE_PRINCIPALS", null, null),
- SHOW_ROLE_GRANT("SHOW_ROLE_GRANT", null, null),
+ SHOW_ROLES("SHOW_ROLES", null, null, true, false),
+ SHOW_ROLE_PRINCIPALS("SHOW_ROLE_PRINCIPALS", null, null, true, false),
+ SHOW_ROLE_GRANT("SHOW_ROLE_GRANT", null, null, true, false),
ALTERTABLE_FILEFORMAT("ALTERTABLE_FILEFORMAT", new Privilege[]{Privilege.ALTER_METADATA}, null),
ALTERPARTITION_FILEFORMAT("ALTERPARTITION_FILEFORMAT", new Privilege[]{Privilege.ALTER_METADATA}, null),
ALTERTABLE_LOCATION("ALTERTABLE_LOCATION", new Privilege[]{Privilege.ALTER_DATA}, null),
@@ -128,8 +128,8 @@ public enum HiveOperation {
ALTERVIEW_RENAME("ALTERVIEW_RENAME", new Privilege[] {Privilege.ALTER_METADATA}, null),
ALTERVIEW_AS("ALTERVIEW_AS", new Privilege[] {Privilege.ALTER_METADATA}, null),
ALTERTABLE_COMPACT("ALTERTABLE_COMPACT", new Privilege[]{Privilege.SELECT}, new Privilege[]{Privilege.ALTER_DATA}),
- SHOW_COMPACTIONS("SHOW COMPACTIONS", null, null),
- SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null),
+ SHOW_COMPACTIONS("SHOW COMPACTIONS", null, null, true, false),
+ SHOW_TRANSACTIONS("SHOW TRANSACTIONS", null, null, true, false),
START_TRANSACTION("START TRANSACTION", null, null, false, false),
COMMIT("COMMIT", null, null, true, true),
ROLLBACK("ROLLBACK", null, null, true, true),
@@ -143,7 +143,10 @@ public enum HiveOperation {
private Privilege[] outputRequiredPrivileges;
/**
- * Only a small set of operations is allowed inside an open transactions, e.g. DML
+ * Only a small set of operations is allowed inside an explicit transactions, e.g. DML on
+ * Acid tables or ops w/o persistent side effects like USE DATABASE, SHOW TABLES, etc so
+ * that rollback is meaningful
+ * todo: mark all operations appropriately
*/
private final boolean allowedInTransaction;
private final boolean requiresOpenTransaction;
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index ffce1d1..7692512 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -459,6 +459,21 @@ public class SessionState {
return txnMgr;
}
+ /**
+ * This only for testing. It allows to switch the manager before the (test) operation so that
+ * it's not coupled to the executing thread. Since tests run against Derby which often wedges
+ * under concurrent access, tests must use a single thead and simulate concurrent access.
+ * For example, {@code TestDbTxnManager2}
+ */
+ @VisibleForTesting
+ public HiveTxnManager setTxnMgr(HiveTxnManager mgr) {
+ if(!(sessionConf.getBoolVar(ConfVars.HIVE_IN_TEST) || sessionConf.getBoolVar(ConfVars.HIVE_IN_TEZ_TEST))) {
+ throw new IllegalStateException("Only for testing!");
+ }
+ HiveTxnManager tmp = txnMgr;
+ txnMgr = mgr;
+ return tmp;
+ }
public HadoopShims.HdfsEncryptionShim getHdfsEncryptionShim() throws HiveException {
try {
return getHdfsEncryptionShim(FileSystem.get(sessionConf));
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index adfe98a..23efce0 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -713,7 +713,7 @@ public class TestTxnHandler {
res = txnHandler.lock(req);
assertTrue(res.getState() == LockState.ACQUIRED);
}
-
+ @Ignore("now that every op has a txn ctx, we don't produce the error expected here....")
@Test
public void testWrongLockForOperation() throws Exception {
LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.DB, "mydb");
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java b/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
index c5b658f..7fe902e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestErrorMsg.java
@@ -23,6 +23,7 @@ import java.util.Set;
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.junit.Test;
public class TestErrorMsg {
@@ -37,8 +38,9 @@ public class TestErrorMsg {
}
@Test
public void testReverseMatch() {
- testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT, "COMMIT");
- testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_TXN, "ALTER TABLE", "1");
+ testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, "COMMIT");
+ testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_IN_TXN, "ALTER TABLE",
+ JavaUtils.txnIdToString(1), "123");
testReverseMatch(ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, "ROLLBACK");
}
private void testReverseMatch(ErrorMsg errorMsg, String... args) {
[2/3] hive git commit: HIVE-12636 Ensure that all queries (with
DbTxnManager) run in a transaction (Eugene Koifman, reviewed by Wei Zheng)
Posted by ek...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index c8bc119..7c66955 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -137,7 +137,6 @@ public class TestTxnCommands {
public void tearDown() throws Exception {
try {
if (d != null) {
- runStatementOnDriver("set autocommit true");
dropTables();
d.destroy();
d.close();
@@ -194,7 +193,6 @@ public class TestTxnCommands {
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
//List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
//Assert.assertEquals("Data didn't match in autocommit=true (rs)", stringifyValues(rows1), rs);
- runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
int[][] rows2 = {{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
@@ -206,8 +204,8 @@ public class TestTxnCommands {
dumpTableData(Table.ACIDTBL, 1, 0);
dumpTableData(Table.ACIDTBL, 2, 0);
runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
- runStatementOnDriver("COMMIT");//txn started implicitly by previous statement
- runStatementOnDriver("set autocommit true");
+ CommandProcessorResponse cpr = runStatementOnDriverNegative("COMMIT");//txn started implicitly by previous statement
+ Assert.assertEquals("Error didn't match: " + cpr, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), cpr.getErrorCode());
List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Data didn't match inside tx (rs0)", allData, rs1);
}
@@ -218,43 +216,35 @@ public class TestTxnCommands {
*/
@Test
public void testErrors() throws Exception {
- runStatementOnDriver("set autocommit true");
- CommandProcessorResponse cpr = runStatementOnDriverNegative("start transaction");
- Assert.assertEquals("Error didn't match: " + cpr, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
- runStatementOnDriver("set autocommit false");
runStatementOnDriver("start transaction");
CommandProcessorResponse cpr2 = runStatementOnDriverNegative("create table foo(x int, y int)");
Assert.assertEquals("Expected DDL to fail in an open txn", ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr2.getErrorCode());
- runStatementOnDriver("set autocommit true");
CommandProcessorResponse cpr3 = runStatementOnDriverNegative("update " + Table.ACIDTBL + " set a = 1 where b != 1");
Assert.assertEquals("Expected update of bucket column to fail",
"FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported. Column a.",
cpr3.getErrorMessage());
- //line below should in principle work but Driver doesn't propagate errorCode properly
- //Assert.assertEquals("Expected update of bucket column to fail", ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE.getErrorCode(), cpr3.getErrorCode());
- cpr3 = runStatementOnDriverNegative("commit work");//not allowed in AC=true
- Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
- cpr3 = runStatementOnDriverNegative("rollback work");//not allowed in AC=true
- Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
- runStatementOnDriver("set autocommit false");
+ Assert.assertEquals("Expected update of bucket column to fail",
+ ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE.getErrorCode(), cpr3.getErrorCode());
cpr3 = runStatementOnDriverNegative("commit");//not allowed in w/o tx
- Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
+ Assert.assertEquals("Error didn't match: " + cpr3,
+ ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), cpr3.getErrorCode());
cpr3 = runStatementOnDriverNegative("rollback");//not allowed in w/o tx
- Assert.assertEquals("Error didn't match: " + cpr3, ErrorMsg.OP_NOT_ALLOWED_IN_AUTOCOMMIT.getErrorCode(), cpr.getErrorCode());
+ Assert.assertEquals("Error didn't match: " + cpr3,
+ ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN.getErrorCode(), cpr3.getErrorCode());
runStatementOnDriver("start transaction");
cpr3 = runStatementOnDriverNegative("start transaction");//not allowed in a tx
- Assert.assertEquals("Expected start transaction to fail", ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr3.getErrorCode());
+ Assert.assertEquals("Expected start transaction to fail",
+ ErrorMsg.OP_NOT_ALLOWED_IN_TXN.getErrorCode(), cpr3.getErrorCode());
runStatementOnDriver("start transaction");//ok since previously opened txn was killed
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Can't see my own write", 1, rs0.size());
- runStatementOnDriver("set autocommit true");//this should commit previous txn
+ runStatementOnDriver("commit work");
rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Can't see my own write", 1, rs0.size());
}
@Test
public void testReadMyOwnInsert() throws Exception {
- runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL);
Assert.assertEquals("Expected empty " + Table.ACIDTBL, 0, rs.size());
@@ -269,7 +259,6 @@ public class TestTxnCommands {
}
@Test
public void testImplicitRollback() throws Exception {
- runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
@@ -286,18 +275,15 @@ public class TestTxnCommands {
}
@Test
public void testExplicitRollback() throws Exception {
- runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
runStatementOnDriver("ROLLBACK");
- runStatementOnDriver("set autocommit true");
List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Rollback didn't rollback", 0, rs.size());
}
@Test
public void testMultipleInserts() throws Exception {
- runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
int[][] rows1 = {{1,2},{3,4}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
@@ -310,7 +296,6 @@ public class TestTxnCommands {
runStatementOnDriver("commit");
dumpTableData(Table.ACIDTBL, 1, 0);
dumpTableData(Table.ACIDTBL, 1, 1);
- runStatementOnDriver("set autocommit true");
List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Content didn't match after commit rs1", allData, rs1);
}
@@ -320,14 +305,12 @@ public class TestTxnCommands {
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
- runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 4");
int[][] updatedData2 = {{1,2}};
List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3);
runStatementOnDriver("commit");
- runStatementOnDriver("set autocommit true");
List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4);
}
@@ -338,7 +321,6 @@ public class TestTxnCommands {
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
- runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
int[][] rows2 = {{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
@@ -351,7 +333,6 @@ public class TestTxnCommands {
List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after update", stringifyValues(updatedData), rs2);
runStatementOnDriver("commit");
- runStatementOnDriver("set autocommit true");
List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData), rs4);
}
@@ -361,7 +342,6 @@ public class TestTxnCommands {
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
- runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
int[][] rows2 = {{5,6},{7,8}};
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows2));
@@ -382,7 +362,6 @@ public class TestTxnCommands {
List<String> rs3 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData2), rs3);
runStatementOnDriver("commit");
- runStatementOnDriver("set autocommit true");
List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData2), rs4);
}
@@ -392,7 +371,6 @@ public class TestTxnCommands {
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(rows1));
List<String> rs0 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Content didn't match rs0", stringifyValues(rows1), rs0);
- runStatementOnDriver("set autocommit false");
runStatementOnDriver("START TRANSACTION");
runStatementOnDriver("delete from " + Table.ACIDTBL + " where b = 8");
int[][] updatedData2 = {{1,2},{3,4},{5,6}};
@@ -412,7 +390,6 @@ public class TestTxnCommands {
int [][] updatedData4 = {{1,3},{5,3}};
Assert.assertEquals("Wrong data after delete", stringifyValues(updatedData4), rs5);
runStatementOnDriver("commit");
- runStatementOnDriver("set autocommit true");
List<String> rs4 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
Assert.assertEquals("Wrong data after commit", stringifyValues(updatedData4), rs4);
}
@@ -433,7 +410,6 @@ public class TestTxnCommands {
}
@Test
public void testTimeOutReaper() throws Exception {
- runStatementOnDriver("set autocommit false");
runStatementOnDriver("start transaction");
runStatementOnDriver("delete from " + Table.ACIDTBL + " where a = 5");
//make sure currently running txn is considered aborted by housekeeper
@@ -467,7 +443,7 @@ public class TestTxnCommands {
}
}
Assert.assertNotNull(txnInfo);
- Assert.assertEquals(2, txnInfo.getId());
+ Assert.assertEquals(12, txnInfo.getId());
Assert.assertEquals(TxnState.OPEN, txnInfo.getState());
String s =TxnDbUtil.queryToString("select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
String[] vals = s.split("\\s+");
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index e2db5b7..5786c4f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -662,11 +662,11 @@ public class TestTxnCommands2 {
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
Arrays.sort(buckets);
if (numDelta == 1) {
- Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName());
+ Assert.assertEquals("delta_0000022_0000022_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numDelta == 2) {
- Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName());
+ Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT, buckets.length);
Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
@@ -711,7 +711,7 @@ public class TestTxnCommands2 {
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numBase == 2) {
// The new base dir now has two bucket files, since the delta dir has two bucket files
- Assert.assertEquals("base_0000002", status[i].getPath().getName());
+ Assert.assertEquals("base_0000023", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT, buckets.length);
Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
@@ -738,7 +738,7 @@ public class TestTxnCommands2 {
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
Assert.assertEquals(1, status.length);
- Assert.assertEquals("base_0000002", status[0].getPath().getName());
+ Assert.assertEquals("base_0000023", status[0].getPath().getName());
FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
Arrays.sort(buckets);
Assert.assertEquals(BUCKET_COUNT, buckets.length);
@@ -760,11 +760,6 @@ public class TestTxnCommands2 {
runStatementOnDriver("select * from " + Table.NONACIDORCTBL);
String value = hiveConf.get(ValidTxnList.VALID_TXNS_KEY);
Assert.assertNull("The entry should be null for query that doesn't involve ACID tables", value);
-
- // 2. Run a query against an ACID table, and we should have txn logged in conf
- runStatementOnDriver("select * from " + Table.ACIDTBL);
- value = hiveConf.get(ValidTxnList.VALID_TXNS_KEY);
- Assert.assertNotNull("The entry shouldn't be null for query that involves ACID tables", value);
}
@Test
@@ -773,10 +768,14 @@ public class TestTxnCommands2 {
int[][] tableData = {{1,2},{3,3}};
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(tableData));
int[][] tableData2 = {{5,3}};
+ //this will cause next txn to be marked aborted but the data is still written to disk
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(tableData2));
- hiveConf.set(ValidTxnList.VALID_TXNS_KEY, "0:");
+ assert hiveConf.get(ValidTxnList.VALID_TXNS_KEY) == null : "previous txn should've cleaned it";
+ //so now if HIVEFETCHTASKCONVERSION were to use a stale value, it would use a
+ //ValidTxnList with HWM=MAX_LONG, i.e. include the data for aborted txn
List<String> rs = runStatementOnDriver("select * from " + Table.ACIDTBL);
- Assert.assertEquals("Missing data", 3, rs.size());
+ Assert.assertEquals("Extra data", 2, rs.size());
}
@Test
public void testUpdateMixedCase() throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
index e516f18..ea5ecbc 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
@@ -84,7 +84,6 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
runStatementOnDriver("create table acidTblLegacy (a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("alter table acidTblLegacy SET TBLPROPERTIES ('transactional_properties' = 'default')");
}
-
/**
* Test the query correctness and directory layout for ACID table conversion with split-update
* enabled.
@@ -96,7 +95,8 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
* @throws Exception
*/
@Test
- public void testNonAcidToAcidSplitUpdateConversion1() throws Exception {
+ @Override
+ public void testNonAcidToAcidConversion1() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
@@ -226,7 +226,8 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
* @throws Exception
*/
@Test
- public void testNonAcidToAcidSplitUpdateConversion2() throws Exception {
+ @Override
+ public void testNonAcidToAcidConversion2() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
@@ -360,7 +361,8 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
* @throws Exception
*/
@Test
- public void testNonAcidToAcidSplitUpdateConversion3() throws Exception {
+ @Override
+ public void testNonAcidToAcidConversion3() throws Exception {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] status;
@@ -442,11 +444,11 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
Arrays.sort(buckets);
if (numDelta == 1) {
- Assert.assertEquals("delta_0000001_0000001_0000", status[i].getPath().getName());
+ Assert.assertEquals("delta_0000022_0000022_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numDelta == 2) {
- Assert.assertEquals("delta_0000002_0000002_0000", status[i].getPath().getName());
+ Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName());
Assert.assertEquals(1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
}
@@ -455,7 +457,7 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
Arrays.sort(buckets);
if (numDeleteDelta == 1) {
- Assert.assertEquals("delete_delta_0000001_0000001_0000", status[i].getPath().getName());
+ Assert.assertEquals("delete_delta_0000022_0000022_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
}
@@ -502,7 +504,7 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numBase == 2) {
// The new base dir now has two bucket files, since the delta dir has two bucket files
- Assert.assertEquals("base_0000002", status[i].getPath().getName());
+ Assert.assertEquals("base_0000023", status[i].getPath().getName());
Assert.assertEquals(1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
}
@@ -528,7 +530,7 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
Assert.assertEquals(1, status.length);
- Assert.assertEquals("base_0000002", status[0].getPath().getName());
+ Assert.assertEquals("base_0000023", status[0].getPath().getName());
FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
Arrays.sort(buckets);
Assert.assertEquals(1, buckets.length);
http://git-wip-us.apache.org/repos/asf/hive/blob/21909601/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 9bfc7d1..14ff58e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -36,11 +36,13 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
@@ -79,13 +81,14 @@ public class TestDbTxnManager {
@Test
public void testSingleReadTable() throws Exception {
addTableInput();
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
Assert.assertEquals(1,
TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
- txnMgr.getLockManager().unlock(locks.get(0));
+ txnMgr.commitTxn();
locks = txnMgr.getLockManager().getLocks(false, false);
Assert.assertEquals(0, locks.size());
}
@@ -93,13 +96,14 @@ public class TestDbTxnManager {
@Test
public void testSingleReadPartition() throws Exception {
addPartitionInput(newTable(true));
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
+ txnMgr.openTxn(ctx, null);
txnMgr.acquireLocks(qp, ctx, null);
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
Assert.assertEquals(1,
TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
- txnMgr.getLockManager().unlock(locks.get(0));
+ txnMgr.commitTxn();
locks = txnMgr.getLockManager().getLocks(false, false);
Assert.assertEquals(0, locks.size());
@@ -111,13 +115,14 @@ public class TestDbTxnManager {
addPartitionInput(t);
addPartitionInput(t);
addPartitionInput(t);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
Assert.assertEquals(3,
TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
- txnMgr.getLockManager().unlock(locks.get(0));
+ txnMgr.commitTxn();
locks = txnMgr.getLockManager().getLocks(false, false);
Assert.assertEquals(0, locks.size());
}
@@ -129,13 +134,14 @@ public class TestDbTxnManager {
addPartitionInput(t);
addPartitionInput(t);
addTableInput();
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
Assert.assertEquals(4,
TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
- txnMgr.getLockManager().unlock(locks.get(0));
+ txnMgr.commitTxn();
locks = txnMgr.getLockManager().getLocks(false, false);
Assert.assertEquals(0, locks.size());
}
@@ -143,7 +149,7 @@ public class TestDbTxnManager {
@Test
public void testSingleWriteTable() throws Exception {
WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
@@ -159,7 +165,7 @@ public class TestDbTxnManager {
@Test
public void testSingleWritePartition() throws Exception {
WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
@@ -174,7 +180,7 @@ public class TestDbTxnManager {
@Test
public void testWriteDynamicPartition() throws Exception {
WriteEntity we = addDynamicPartitionedOutput(newTable(true), WriteEntity.WriteType.INSERT);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
@@ -217,7 +223,7 @@ public class TestDbTxnManager {
@Test
public void testExceptions() throws Exception {
addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
runReaper();
@@ -243,10 +249,11 @@ public class TestDbTxnManager {
txnMgr.rollbackTxn();//this is idempotent
}
+ @Ignore("This seems useless now that we have a txn for everything")
@Test
public void testLockTimeout() throws Exception {
addPartitionInput(newTable(true));
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
//make sure it works with nothing to expire
testLockExpiration(txnMgr, 0, true);
@@ -294,7 +301,7 @@ public class TestDbTxnManager {
addPartitionInput(t);
addPartitionInput(t);
WriteEntity we = addTableOutput(WriteEntity.WriteType.INSERT);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
@@ -309,7 +316,7 @@ public class TestDbTxnManager {
@Test
public void testUpdate() throws Exception {
WriteEntity we = addTableOutput(WriteEntity.WriteType.UPDATE);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
@@ -324,7 +331,7 @@ public class TestDbTxnManager {
@Test
public void testDelete() throws Exception {
WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
@@ -339,7 +346,7 @@ public class TestDbTxnManager {
@Test
public void testRollback() throws Exception {
WriteEntity we = addTableOutput(WriteEntity.WriteType.DELETE);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
@@ -354,13 +361,14 @@ public class TestDbTxnManager {
@Test
public void testDDLExclusive() throws Exception {
WriteEntity we = addTableOutput(WriteEntity.WriteType.DDL_EXCLUSIVE);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.DROPTABLE);
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
Assert.assertEquals(1,
TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
- txnMgr.getLockManager().unlock(locks.get(0));
+ txnMgr.rollbackTxn();
locks = txnMgr.getLockManager().getLocks(false, false);
Assert.assertEquals(0, locks.size());
}
@@ -368,13 +376,14 @@ public class TestDbTxnManager {
@Test
public void testDDLShared() throws Exception {
WriteEntity we = addTableOutput(WriteEntity.WriteType.DDL_SHARED);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.ALTERTABLE_EXCHANGEPARTITION);
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
Assert.assertEquals(1,
TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
- txnMgr.getLockManager().unlock(locks.get(0));
+ txnMgr.commitTxn();
locks = txnMgr.getLockManager().getLocks(false, false);
Assert.assertEquals(0, locks.size());
}
@@ -382,10 +391,12 @@ public class TestDbTxnManager {
@Test
public void testDDLNoLock() throws Exception {
WriteEntity we = addTableOutput(WriteEntity.WriteType.DDL_NO_LOCK);
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.CREATEDATABASE);
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertNull(locks);
+ txnMgr.rollbackTxn();
}
@Test
@@ -406,11 +417,12 @@ public class TestDbTxnManager {
@Test
public void testLockAcquisitionAndRelease() throws Exception {
addTableInput();
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
+ txnMgr.openTxn(ctx, "fred");
txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
- txnMgr.releaseLocks(locks);
+ txnMgr.commitTxn();
locks = txnMgr.getLockManager().getLocks(false, false);
Assert.assertEquals(0, locks.size());
}
@@ -421,7 +433,7 @@ public class TestDbTxnManager {
addTableInput();
LockException exception = null;
- QueryPlan qp = new MockQueryPlan(this);
+ QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
// Case 1: If there's no delay for the heartbeat, txn should be able to commit
txnMgr.openTxn(ctx, "fred");
@@ -493,7 +505,8 @@ public class TestDbTxnManager {
private final HashSet<WriteEntity> outputs = new HashSet<>();
private final String queryId;
- MockQueryPlan(TestDbTxnManager test) {
+ MockQueryPlan(TestDbTxnManager test, HiveOperation operation) {
+ super(operation);
inputs.addAll(test.readEntities);
outputs.addAll(test.writeEntities);
queryId = makeQueryId();