You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/05/06 20:43:04 UTC
[33/50] [abbrv] hive git commit: HIVE-13395 Lost Update problem in
ACID (Eugene Koifman, reviewed by Alan Gates)
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index e94af55..c956d78 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -17,7 +17,13 @@
*/
package org.apache.hadoop.hive.ql.lockmgr;
-import junit.framework.Assert;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.TestTxnCommands2;
+import org.apache.hadoop.hive.ql.txn.AcidWriteSetService;
+import org.junit.After;
+import org.junit.Assert;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
@@ -29,23 +35,32 @@ import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
* See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager}
* Tests here are "end-to-end"ish and simulate concurrent queries.
+ *
+ * The general approach is to use an instance of Driver to use Driver.run() to create tables
+ * Use Driver.compile() to generate QueryPlan which can then be passed to HiveTxnManager.acquireLocks().
+ * Same HiveTxnManager is used to openTxn()/commitTxn() etc. This can exercise almost the entire
+ * code path that CLI would but with the advantage that you can create a 2nd HiveTxnManager and then
+ * simulate interleaved transactional/locking operations but all from within a single thread.
+ * The later not only controls concurrency precisely but is the only way to run in UT env with DerbyDB.
*/
public class TestDbTxnManager2 {
private static HiveConf conf = new HiveConf(Driver.class);
private HiveTxnManager txnMgr;
private Context ctx;
private Driver driver;
+ TxnStore txnHandler;
@BeforeClass
public static void setUpClass() throws Exception {
@@ -60,15 +75,17 @@ public class TestDbTxnManager2 {
driver.init();
TxnDbUtil.cleanDb();
TxnDbUtil.prepDb();
- txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ SessionState ss = SessionState.get();
+ ss.initTxnMgr(conf);
+ txnMgr = ss.getTxnMgr();
Assert.assertTrue(txnMgr instanceof DbTxnManager);
+ txnHandler = TxnUtils.getTxnStore(conf);
+
}
@After
public void tearDown() throws Exception {
driver.close();
if (txnMgr != null) txnMgr.closeTxnManager();
- TxnDbUtil.cleanDb();
- TxnDbUtil.prepDb();
}
@Test
public void testLocksInSubquery() throws Exception {
@@ -192,22 +209,24 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");
checkCmdOnDriver(cpr);
+ txnMgr.openTxn("Fifer");
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
- List<HiveLock> updateLocks = ctx.getHiveLocks();
- cpr = driver.compileAndRespond("drop database if exists temp");
- LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7
+ checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp"));
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ //txnMgr2.openTxn("Fiddler");
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "temp", "T7", null, locks.get(0));
checkLock(LockType.EXCLUSIVE, LockState.WAITING, "temp", null, null, locks.get(1));
- txnMgr.getLockManager().releaseLocks(updateLocks);
- lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
+ txnMgr.commitTxn();
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "temp", null, null, locks.get(0));
List<HiveLock> xLock = new ArrayList<HiveLock>(0);
xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- txnMgr.getLockManager().releaseLocks(xLock);
+ txnMgr2.getLockManager().releaseLocks(xLock);
}
@Test
public void updateSelectUpdate() throws Exception {
@@ -215,29 +234,27 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
cpr = driver.compileAndRespond("delete from T8 where b = 89");
checkCmdOnDriver(cpr);
+ txnMgr.openTxn("Fifer");
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8
- List<HiveLock> deleteLocks = ctx.getHiveLocks();
cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8
checkCmdOnDriver(cpr);
- txnMgr.acquireLocks(driver.getPlan(), ctx, "Fiddler");
- cpr = driver.compileAndRespond("update T8 set a = 1 where b = 1");
- checkCmdOnDriver(cpr);
- LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("Fiddler");
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler");
+ checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1"));
+ ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 3, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0));
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1));
checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "T8", null, locks.get(2));
- txnMgr.getLockManager().releaseLocks(deleteLocks);
- lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());
+ txnMgr.rollbackTxn();
+ ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(2).getLockid());
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0));
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1));
- List<HiveLock> relLocks = new ArrayList<HiveLock>(2);
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
- txnMgr.getLockManager().releaseLocks(relLocks);
+ txnMgr2.commitTxn();
cpr = driver.run("drop table if exists T6");
locks = getLocks();
Assert.assertEquals("Unexpected number of locks found", 0, locks.size());
@@ -617,12 +634,12 @@ public class TestDbTxnManager2 {
txnMgr.getLockManager().releaseLocks(relLocks);
}
- private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) {
- Assert.assertEquals(l.toString(),l.getType(), type);
- Assert.assertEquals(l.toString(),l.getState(), state);
- Assert.assertEquals(l.toString(), normalizeCase(l.getDbname()), normalizeCase(db));
- Assert.assertEquals(l.toString(), normalizeCase(l.getTablename()), normalizeCase(table));
- Assert.assertEquals(l.toString(), normalizeCase(l.getPartname()), normalizeCase(partition));
+ private void checkLock(LockType expectedType, LockState expectedState, String expectedDb, String expectedTable, String expectedPartition, ShowLocksResponseElement actual) {
+ Assert.assertEquals(actual.toString(), expectedType, actual.getType());
+ Assert.assertEquals(actual.toString(), expectedState,actual.getState());
+ Assert.assertEquals(actual.toString(), normalizeCase(expectedDb), normalizeCase(actual.getDbname()));
+ Assert.assertEquals(actual.toString(), normalizeCase(expectedTable), normalizeCase(actual.getTablename()));
+ Assert.assertEquals(actual.toString(), normalizeCase(expectedPartition), normalizeCase(actual.getPartname()));
}
private void checkCmdOnDriver(CommandProcessorResponse cpr) {
Assert.assertTrue(cpr.toString(), cpr.getResponseCode() == 0);
@@ -637,4 +654,541 @@ public class TestDbTxnManager2 {
ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks();
return rsp.getLocks();
}
+
+ /**
+ * txns update same resource but do not overlap in time - no conflict
+ */
+ @Test
+ public void testWriteSetTracking1() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+
+ checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART"));
+ txnMgr.openTxn("Nicholas");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas");
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr.commitTxn();
+ txnMgr2.openTxn("Alexandra");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas");
+ txnMgr2.commitTxn();
+ }
+ /**
+ * txns overlap in time but do not update same resource - no conflict
+ */
+ @Test
+ public void testWriteSetTracking2() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table if not exists TAB2 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr.openTxn("Peter");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Peter");
+ txnMgr2.openTxn("Catherine");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ //note that "update" uses dynamic partitioning thus lock is on the table not partition
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ txnMgr.commitTxn();
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 9 where p = 'doh'"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Catherine");
+ txnMgr2.commitTxn();
+ }
+
+ /**
+ * txns overlap and update the same resource - can't commit 2nd txn
+ */
+ @Test
+ public void testWriteSetTracking3() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+ txnMgr.openTxn("Known");
+ txnMgr2.openTxn("Unknown");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
+ locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks.get(1));
+ txnMgr.commitTxn();
+ LockException expectedException = null;
+ try {
+ txnMgr2.commitTxn();
+ }
+ catch (LockException e) {
+ expectedException = e;
+ }
+ Assert.assertTrue("Didn't get exception", expectedException != null);
+ Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
+ Assert.assertEquals("Exception msg didn't match",
+ "Aborting [txnid:2,2] due to a write conflict on default/tab_part committed by [txnid:1,2]",
+ expectedException.getCause().getMessage());
+ }
+ /**
+ * txns overlap, update same resource, simulate multi-stmt txn case
+ * Also tests that we kill txn when it tries to acquire lock if we already know it will not be committed
+ */
+ @Test
+ public void testWriteSetTracking4() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table if not exists TAB2 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+
+ txnMgr.openTxn("Long Running");
+ checkCmdOnDriver(driver.compileAndRespond("select a from TAB_PART where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ //for some reason this just locks the table; if I alter table to add this partition, then
+ //we end up locking both table and partition with share_read. (Plan has 2 ReadEntities)...?
+ //same for other locks below
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("Short Running");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'"));//no such partition
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Short Running");
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));
+ //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+ "default", "tab2", Collections.EMPTY_LIST));
+ txnMgr2.commitTxn();
+ //Short Running updated nothing, so we expect 0 rows in WRITE_SET
+ Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+
+ txnMgr2.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'"));//pretend this partition exists
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T3");
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));//since TAB2 is empty
+ //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+ "default", "tab2", Collections.singletonList("p=two")));//simulate partition update
+ txnMgr2.commitTxn();
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+
+ AcidWriteSetService houseKeeper = new AcidWriteSetService();
+ TestTxnCommands2.runHouseKeeperService(houseKeeper, conf);
+ //since T3 overlaps with Long Running (still open) GC does nothing
+ Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1"));//no rows match
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
+ //so generate empty Dyn Part call
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(),
+ "default", "tab2", Collections.EMPTY_LIST));
+ txnMgr.commitTxn();
+
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 0, locks.size());
+ TestTxnCommands2.runHouseKeeperService(houseKeeper, conf);
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ }
+ /**
+ * overlapping txns updating the same resource but 1st one rolls back; 2nd commits
+ * @throws Exception
+ */
+ @Test
+ public void testWriteSetTracking5() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+ txnMgr.openTxn("Known");
+ txnMgr2.openTxn("Unknown");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
+ locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks.get(1));
+ txnMgr.rollbackTxn();
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ txnMgr2.commitTxn();//since conflicting txn rolled back, commit succeeds
+ Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ }
+ /**
+ * check that read query concurrent with txn works ok
+ */
+ @Test
+ public void testWriteSetTracking6() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB2(a int, b int) clustered " +
+ "by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.compileAndRespond("select * from TAB2 where a = 113"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Works");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("Horton");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton");
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));
+ txnMgr2.commitTxn();//no conflict
+ Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+ TestTxnCommands2.runHouseKeeperService(new AcidWriteSetService(), conf);
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ }
+
+ /**
+ * 2 concurrent txns update different partitions of the same table and succeed
+ * @throws Exception
+ */
+ @Test
+ public void testWriteSetTracking7() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab2 (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab2 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+ //test with predicates such that partition pruning works
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks.get(0));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(1));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2",
+ Collections.singletonList("p=two")));
+ txnMgr2.commitTxn();//txnid:2
+
+ locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2",
+ Collections.singletonList("p=one")));
+ txnMgr.commitTxn();//txnid:3
+ //now both txns concurrently updated TAB2 but different partitions.
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u'"));
+ //2 from txnid:1, 1 from txnid:2, 1 from txnid:3
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab2' and ctc_partition is not null"));
+
+ //================
+ //test with predicates such that partition pruning doesn't kick in
+ cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4
+ txnMgr2.openTxn("T5");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5");
+ locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T6");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 4, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks.get(3));
+
+ //this simulates the completion of txnid:5
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one")));
+ txnMgr2.commitTxn();//txnid:5
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ //completion of txnid:6
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr.commitTxn();//txnid:6
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ //2 from insert + 1 for each update stmt
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent updates with partition pruning predicate and w/o one
+ */
+ @Test
+ public void testWriteSetTracking8() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr.commitTxn();//txnid:3
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent update/delete of different partitions - should pass
+ */
+ @Test
+ public void testWriteSetTracking9() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr.commitTxn();//txnid:3
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent update/delete of same partition - should fail to commit
+ */
+ @Test
+ public void testWriteSetTracking10() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ LockException exception = null;
+ try {
+ txnMgr.commitTxn();//txnid:3
+ }
+ catch(LockException e) {
+ exception = e;
+ }
+ Assert.assertNotEquals("Expected exception", null, exception);
+ Assert.assertEquals("Exception msg doesn't match",
+ "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]",
+ exception.getCause().getMessage());
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 3, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent delte/detele of same partition - should pass
+ * This test doesn't work yet, because we don't yet pass in operation type
+ *
+ * todo: Concurrent insert/update of same partition - should pass
+ */
+ @Ignore("HIVE-13622")
+ @Test
+ public void testWriteSetTracking11() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ LockException exception = null;
+ try {
+ txnMgr.commitTxn();//txnid:3
+ }
+ catch(LockException e) {
+ exception = e;
+ }
+ Assert.assertNotEquals("Expected exception", null, exception);
+ Assert.assertEquals("Exception msg doesn't match",
+ "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]",
+ exception.getCause().getMessage());
+
+ //todo: this currently fails since we don't yet set operation type properly
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index a247065..1578bfb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
@@ -261,6 +263,8 @@ public class TestCleaner extends CompactorTest {
List<LockComponent> components = new ArrayList<LockComponent>(1);
components.add(comp);
LockRequest req = new LockRequest(components, "me", "localhost");
+ OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(1, "Dracula", "Transylvania"));
+ req.setTxnid(resp.getTxn_ids().get(0));
LockResponse res = txnHandler.lock(req);
startCleaner();