You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:18:51 UTC
[01/13] hive git commit: HIVE-13395 Lost Update problem in ACID
(Eugene Koifman, reviewed by Alan Gates)
Repository: hive
Updated Branches:
refs/heads/llap 89ec219e1 -> f089f2e64
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index e94af55..c956d78 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -17,7 +17,13 @@
*/
package org.apache.hadoop.hive.ql.lockmgr;
-import junit.framework.Assert;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.TestTxnCommands2;
+import org.apache.hadoop.hive.ql.txn.AcidWriteSetService;
+import org.junit.After;
+import org.junit.Assert;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
@@ -29,23 +35,32 @@ import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
* See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager}
* Tests here are "end-to-end"ish and simulate concurrent queries.
+ *
+ * The general approach is to use an instance of Driver to use Driver.run() to create tables
+ * Use Driver.compile() to generate QueryPlan which can then be passed to HiveTxnManager.acquireLocks().
+ * Same HiveTxnManager is used to openTxn()/commitTxn() etc. This can exercise almost the entire
+ * code path that CLI would but with the advantage that you can create a 2nd HiveTxnManager and then
+ * simulate interleaved transactional/locking operations but all from within a single thread.
+ * The later not only controls concurrency precisely but is the only way to run in UT env with DerbyDB.
*/
public class TestDbTxnManager2 {
private static HiveConf conf = new HiveConf(Driver.class);
private HiveTxnManager txnMgr;
private Context ctx;
private Driver driver;
+ TxnStore txnHandler;
@BeforeClass
public static void setUpClass() throws Exception {
@@ -60,15 +75,17 @@ public class TestDbTxnManager2 {
driver.init();
TxnDbUtil.cleanDb();
TxnDbUtil.prepDb();
- txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ SessionState ss = SessionState.get();
+ ss.initTxnMgr(conf);
+ txnMgr = ss.getTxnMgr();
Assert.assertTrue(txnMgr instanceof DbTxnManager);
+ txnHandler = TxnUtils.getTxnStore(conf);
+
}
@After
public void tearDown() throws Exception {
driver.close();
if (txnMgr != null) txnMgr.closeTxnManager();
- TxnDbUtil.cleanDb();
- TxnDbUtil.prepDb();
}
@Test
public void testLocksInSubquery() throws Exception {
@@ -192,22 +209,24 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");
checkCmdOnDriver(cpr);
+ txnMgr.openTxn("Fifer");
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
- List<HiveLock> updateLocks = ctx.getHiveLocks();
- cpr = driver.compileAndRespond("drop database if exists temp");
- LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7
+ checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp"));
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ //txnMgr2.openTxn("Fiddler");
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "temp", "T7", null, locks.get(0));
checkLock(LockType.EXCLUSIVE, LockState.WAITING, "temp", null, null, locks.get(1));
- txnMgr.getLockManager().releaseLocks(updateLocks);
- lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
+ txnMgr.commitTxn();
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "temp", null, null, locks.get(0));
List<HiveLock> xLock = new ArrayList<HiveLock>(0);
xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- txnMgr.getLockManager().releaseLocks(xLock);
+ txnMgr2.getLockManager().releaseLocks(xLock);
}
@Test
public void updateSelectUpdate() throws Exception {
@@ -215,29 +234,27 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(cpr);
cpr = driver.compileAndRespond("delete from T8 where b = 89");
checkCmdOnDriver(cpr);
+ txnMgr.openTxn("Fifer");
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8
- List<HiveLock> deleteLocks = ctx.getHiveLocks();
cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8
checkCmdOnDriver(cpr);
- txnMgr.acquireLocks(driver.getPlan(), ctx, "Fiddler");
- cpr = driver.compileAndRespond("update T8 set a = 1 where b = 1");
- checkCmdOnDriver(cpr);
- LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("Fiddler");
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler");
+ checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1"));
+ ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 3, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0));
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1));
checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "T8", null, locks.get(2));
- txnMgr.getLockManager().releaseLocks(deleteLocks);
- lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());
+ txnMgr.rollbackTxn();
+ ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(2).getLockid());
locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0));
checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1));
- List<HiveLock> relLocks = new ArrayList<HiveLock>(2);
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
- relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
- txnMgr.getLockManager().releaseLocks(relLocks);
+ txnMgr2.commitTxn();
cpr = driver.run("drop table if exists T6");
locks = getLocks();
Assert.assertEquals("Unexpected number of locks found", 0, locks.size());
@@ -617,12 +634,12 @@ public class TestDbTxnManager2 {
txnMgr.getLockManager().releaseLocks(relLocks);
}
- private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) {
- Assert.assertEquals(l.toString(),l.getType(), type);
- Assert.assertEquals(l.toString(),l.getState(), state);
- Assert.assertEquals(l.toString(), normalizeCase(l.getDbname()), normalizeCase(db));
- Assert.assertEquals(l.toString(), normalizeCase(l.getTablename()), normalizeCase(table));
- Assert.assertEquals(l.toString(), normalizeCase(l.getPartname()), normalizeCase(partition));
+ private void checkLock(LockType expectedType, LockState expectedState, String expectedDb, String expectedTable, String expectedPartition, ShowLocksResponseElement actual) {
+ Assert.assertEquals(actual.toString(), expectedType, actual.getType());
+ Assert.assertEquals(actual.toString(), expectedState,actual.getState());
+ Assert.assertEquals(actual.toString(), normalizeCase(expectedDb), normalizeCase(actual.getDbname()));
+ Assert.assertEquals(actual.toString(), normalizeCase(expectedTable), normalizeCase(actual.getTablename()));
+ Assert.assertEquals(actual.toString(), normalizeCase(expectedPartition), normalizeCase(actual.getPartname()));
}
private void checkCmdOnDriver(CommandProcessorResponse cpr) {
Assert.assertTrue(cpr.toString(), cpr.getResponseCode() == 0);
@@ -637,4 +654,541 @@ public class TestDbTxnManager2 {
ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks();
return rsp.getLocks();
}
+
+ /**
+ * txns update same resource but do not overlap in time - no conflict
+ */
+ @Test
+ public void testWriteSetTracking1() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+
+ checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART"));
+ txnMgr.openTxn("Nicholas");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas");
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr.commitTxn();
+ txnMgr2.openTxn("Alexandra");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas");
+ txnMgr2.commitTxn();
+ }
+ /**
+ * txns overlap in time but do not update same resource - no conflict
+ */
+ @Test
+ public void testWriteSetTracking2() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table if not exists TAB2 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr.openTxn("Peter");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Peter");
+ txnMgr2.openTxn("Catherine");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ //note that "update" uses dynamic partitioning thus lock is on the table not partition
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ txnMgr.commitTxn();
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 9 where p = 'doh'"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Catherine");
+ txnMgr2.commitTxn();
+ }
+
+ /**
+ * txns overlap and update the same resource - can't commit 2nd txn
+ */
+ @Test
+ public void testWriteSetTracking3() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+ txnMgr.openTxn("Known");
+ txnMgr2.openTxn("Unknown");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
+ locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks.get(1));
+ txnMgr.commitTxn();
+ LockException expectedException = null;
+ try {
+ txnMgr2.commitTxn();
+ }
+ catch (LockException e) {
+ expectedException = e;
+ }
+ Assert.assertTrue("Didn't get exception", expectedException != null);
+ Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
+ Assert.assertEquals("Exception msg didn't match",
+ "Aborting [txnid:2,2] due to a write conflict on default/tab_part committed by [txnid:1,2]",
+ expectedException.getCause().getMessage());
+ }
+ /**
+ * txns overlap, update same resource, simulate multi-stmt txn case
+ * Also tests that we kill txn when it tries to acquire lock if we already know it will not be committed
+ */
+ @Test
+ public void testWriteSetTracking4() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ cpr = driver.run("create table if not exists TAB2 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+
+ txnMgr.openTxn("Long Running");
+ checkCmdOnDriver(driver.compileAndRespond("select a from TAB_PART where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ //for some reason this just locks the table; if I alter table to add this partition, then
+ //we end up locking both table and partition with share_read. (Plan has 2 ReadEntities)...?
+ //same for other locks below
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("Short Running");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'"));//no such partition
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Short Running");
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));
+ //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+ "default", "tab2", Collections.EMPTY_LIST));
+ txnMgr2.commitTxn();
+ //Short Running updated nothing, so we expect 0 rows in WRITE_SET
+ Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+
+ txnMgr2.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'"));//pretend this partition exists
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T3");
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));//since TAB2 is empty
+ //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+ "default", "tab2", Collections.singletonList("p=two")));//simulate partition update
+ txnMgr2.commitTxn();
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+
+ AcidWriteSetService houseKeeper = new AcidWriteSetService();
+ TestTxnCommands2.runHouseKeeperService(houseKeeper, conf);
+ //since T3 overlaps with Long Running (still open) GC does nothing
+ Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1"));//no rows match
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
+ //so generate empty Dyn Part call
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(),
+ "default", "tab2", Collections.EMPTY_LIST));
+ txnMgr.commitTxn();
+
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 0, locks.size());
+ TestTxnCommands2.runHouseKeeperService(houseKeeper, conf);
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ }
+ /**
+ * overlapping txns updating the same resource but 1st one rolls back; 2nd commits
+ * @throws Exception
+ */
+ @Test
+ public void testWriteSetTracking5() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+ txnMgr.openTxn("Known");
+ txnMgr2.openTxn("Unknown");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
+ locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks.get(1));
+ txnMgr.rollbackTxn();
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ txnMgr2.commitTxn();//since conflicting txn rolled back, commit succeeds
+ Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ }
+ /**
+ * check that read query concurrent with txn works ok
+ */
+ @Test
+ public void testWriteSetTracking6() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists TAB2(a int, b int) clustered " +
+ "by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.compileAndRespond("select * from TAB2 where a = 113"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Works");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("Horton");
+ checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton");
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));
+ txnMgr2.commitTxn();//no conflict
+ Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+ TestTxnCommands2.runHouseKeeperService(new AcidWriteSetService(), conf);
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ }
+
+ /**
+ * 2 concurrent txns update different partitions of the same table and succeed
+ * @throws Exception
+ */
+ @Test
+ public void testWriteSetTracking7() throws Exception {
+ Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab2 (a int, b int) " +
+ "partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab2 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+ //test with predicates such that partition pruning works
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks.get(0));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(1));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2",
+ Collections.singletonList("p=two")));
+ txnMgr2.commitTxn();//txnid:2
+
+ locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2",
+ Collections.singletonList("p=one")));
+ txnMgr.commitTxn();//txnid:3
+ //now both txns concurrently updated TAB2 but different partitions.
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u'"));
+ //2 from txnid:1, 1 from txnid:2, 1 from txnid:3
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab2' and ctc_partition is not null"));
+
+ //================
+ //test with predicates such that partition pruning doesn't kick in
+ cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4
+ txnMgr2.openTxn("T5");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5");
+ locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T6");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 4, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks.get(3));
+
+ //this simulates the completion of txnid:5
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one")));
+ txnMgr2.commitTxn();//txnid:5
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ //completion of txnid:6
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr.commitTxn();//txnid:6
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ //2 from insert + 1 for each update stmt
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent updates with partition pruning predicate and w/o one
+ */
+ @Test
+ public void testWriteSetTracking8() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr.commitTxn();//txnid:3
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent update/delete of different partitions - should pass
+ */
+ @Test
+ public void testWriteSetTracking9() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=one")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr.commitTxn();//txnid:3
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent update/delete of same partition - should fail to commit
+ */
+ @Test
+ public void testWriteSetTracking10() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ LockException exception = null;
+ try {
+ txnMgr.commitTxn();//txnid:3
+ }
+ catch(LockException e) {
+ exception = e;
+ }
+ Assert.assertNotEquals("Expected exception", null, exception);
+ Assert.assertEquals("Exception msg doesn't match",
+ "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]",
+ exception.getCause().getMessage());
+
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 3, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
+ /**
+ * Concurrent delte/detele of same partition - should pass
+ * This test doesn't work yet, because we don't yet pass in operation type
+ *
+ * todo: Concurrent insert/update of same partition - should pass
+ */
+ @Ignore("HIVE-13622")
+ @Test
+ public void testWriteSetTracking11() throws Exception {
+ CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+ "clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+ //now start concurrent txn
+ txnMgr.openTxn("T3");
+ checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+ ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+ //this simulates the completion of txnid:2
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ txnMgr2.commitTxn();//txnid:2
+
+ ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+ locks = getLocks(txnMgr);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+ //completion of txnid:3
+ txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+ Collections.singletonList("p=two")));
+ LockException exception = null;
+ try {
+ txnMgr.commitTxn();//txnid:3
+ }
+ catch(LockException e) {
+ exception = e;
+ }
+ Assert.assertNotEquals("Expected exception", null, exception);
+ Assert.assertEquals("Exception msg doesn't match",
+ "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]",
+ exception.getCause().getMessage());
+
+ //todo: this currently fails since we don't yet set operation type properly
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+ Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+ Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index a247065..1578bfb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
@@ -261,6 +263,8 @@ public class TestCleaner extends CompactorTest {
List<LockComponent> components = new ArrayList<LockComponent>(1);
components.add(comp);
LockRequest req = new LockRequest(components, "me", "localhost");
+ OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(1, "Dracula", "Transylvania"));
+ req.setTxnid(resp.getTxn_ids().get(0));
LockResponse res = txnHandler.lock(req);
startCleaner();
[08/13] hive git commit: HIVE-13656 : need to set direct memory limit
higher in LlapServiceDriver for certain edge case configurations (Sergey
Shelukhin, reviewed by Vikram Dixit K and Siddharth Seth)
Posted by jd...@apache.org.
HIVE-13656 : need to set direct memory limit higher in LlapServiceDriver for certain edge case configurations (Sergey Shelukhin, reviewed by Vikram Dixit K and Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3517a99e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3517a99e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3517a99e
Branch: refs/heads/llap
Commit: 3517a99edde061596d62b41339bacb5aac0e8290
Parents: eb2c54b
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu May 5 17:01:47 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu May 5 17:02:36 2016 -0700
----------------------------------------------------------------------
.../hadoop/hive/llap/cli/LlapServiceDriver.java | 21 +++++++++++---------
llap-server/src/main/resources/package.py | 6 +++++-
2 files changed, 17 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3517a99e/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index de6d9b8..006f70f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -236,20 +236,22 @@ public class LlapServiceDriver {
String.valueOf(options.getIoThreads()));
}
+ long cache = -1, xmx = -1;
if (options.getCache() != -1) {
- conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
- Long.toString(options.getCache()));
+ cache = options.getCache();
+ conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache));
propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
- Long.toString(options.getCache()));
+ Long.toString(cache));
}
if (options.getXmx() != -1) {
// Needs more explanation here
- // Xmx is not the max heap value in JDK8
- // You need to subtract 50% of the survivor fraction from this, to get actual usable memory before it goes into GC
- long xmx = (long) (options.getXmx() / (1024 * 1024));
+ // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction
+ // from this, to get actual usable memory before it goes into GC
+ xmx = (long) (options.getXmx() / (1024 * 1024));
conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmx);
- propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, String.valueOf(xmx));
+ propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
+ String.valueOf(xmx));
}
if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) {
@@ -258,8 +260,6 @@ public class LlapServiceDriver {
.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
}
-
-
URL logger = conf.getResource(LlapDaemon.LOG4j2_PROPERTIES_FILE);
if (null == logger) {
@@ -460,6 +460,9 @@ public class LlapServiceDriver {
configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1));
+ long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long)(cache * 1.25) : -1;
+ configs.put("max_direct_memory", Long.toString(maxDirect));
+
FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json"));
OutputStreamWriter w = new OutputStreamWriter(os);
configs.write(w);
http://git-wip-us.apache.org/repos/asf/hive/blob/3517a99e/llap-server/src/main/resources/package.py
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/package.py b/llap-server/src/main/resources/package.py
index 63c0ef1..94c9d1a 100644
--- a/llap-server/src/main/resources/package.py
+++ b/llap-server/src/main/resources/package.py
@@ -101,6 +101,10 @@ def main(args):
return
config = json_parse(open(join(input, "config.json")).read())
java_home = config["java.home"]
+ max_direct_memory = config["max_direct_memory"]
+ daemon_args = args.args
+ if max_direct_memory > 0:
+ daemon_args = " -XX:MaxDirectMemorySize=%s %s" % (max_direct_memory, daemon_args)
resource = LlapResource(config)
# 5% container failure every monkey_interval seconds
monkey_percentage = 5 # 5%
@@ -114,7 +118,7 @@ def main(args):
"hadoop_home" : os.getenv("HADOOP_HOME"),
"java_home" : java_home,
"name" : resource.clusterName,
- "daemon_args" : args.args,
+ "daemon_args" : daemon_args,
"daemon_loglevel" : args.loglevel,
"queue.string" : resource.queueString,
"monkey_interval" : args.chaosmonkey,
[04/13] hive git commit: HIVE-13637: Fold CASE into NVL when CBO
optimized the plan (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Posted by jd...@apache.org.
HIVE-13637: Fold CASE into NVL when CBO optimized the plan (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/da82819b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/da82819b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/da82819b
Branch: refs/heads/llap
Commit: da82819bc112589e0d96874947c942e834681ed2
Parents: 10d0549
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Wed May 4 01:27:30 2016 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu May 5 22:13:10 2016 +0100
----------------------------------------------------------------------
.../calcite/translator/JoinTypeCheckCtx.java | 2 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 17 ++++++++-----
.../hadoop/hive/ql/parse/TypeCheckCtx.java | 19 +++++++++-----
.../hive/ql/parse/TypeCheckProcFactory.java | 26 ++++++++++++++++++++
.../queries/clientpositive/constantPropWhen.q | 2 ++
5 files changed, 53 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/da82819b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/JoinTypeCheckCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/JoinTypeCheckCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/JoinTypeCheckCtx.java
index dccd1d9..f166bb6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/JoinTypeCheckCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/JoinTypeCheckCtx.java
@@ -53,7 +53,7 @@ public class JoinTypeCheckCtx extends TypeCheckCtx {
public JoinTypeCheckCtx(RowResolver leftRR, RowResolver rightRR, JoinType hiveJoinType)
throws SemanticException {
- super(RowResolver.getCombinedRR(leftRR, rightRR), true, false, false, false, false, false, false,
+ super(RowResolver.getCombinedRR(leftRR, rightRR), true, false, false, false, false, false, false, false,
false, false);
this.inputRRLst = ImmutableList.of(leftRR, rightRR);
this.outerJoin = (hiveJoinType == JoinType.LEFTOUTER) || (hiveJoinType == JoinType.RIGHTOUTER)
http://git-wip-us.apache.org/repos/asf/hive/blob/da82819b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 2983d38..f79a525 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -3143,8 +3143,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
OpParseContext inputCtx = opParseCtx.get(input);
RowResolver inputRR = inputCtx.getRowResolver();
Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new FilterDesc(genExprNodeDesc(condn, inputRR, useCaching), false), new RowSchema(
- inputRR.getColumnInfos()), input), inputRR);
+ new FilterDesc(genExprNodeDesc(condn, inputRR, useCaching, isCBOExecuted()), false),
+ new RowSchema(inputRR.getColumnInfos()), input), inputRR);
if (LOG.isDebugEnabled()) {
LOG.debug("Created Filter Plan for " + qb.getId() + " row schema: "
@@ -4146,7 +4146,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
expr, col_list, null, inputRR, starRR, pos, out_rwsch, qb.getAliases(), false);
} else {
// Case when this is an expression
- TypeCheckCtx tcCtx = new TypeCheckCtx(inputRR);
+ TypeCheckCtx tcCtx = new TypeCheckCtx(inputRR, true, isCBOExecuted());
// We allow stateful functions in the SELECT list (but nowhere else)
tcCtx.setAllowStatefulFunctions(true);
tcCtx.setAllowDistinctFunctions(false);
@@ -7777,7 +7777,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
List<ASTNode> expressions = joinTree.getExpressions().get(i);
joinKeys[i] = new ExprNodeDesc[expressions.size()];
for (int j = 0; j < joinKeys[i].length; j++) {
- joinKeys[i][j] = genExprNodeDesc(expressions.get(j), inputRR);
+ joinKeys[i][j] = genExprNodeDesc(expressions.get(j), inputRR, true, isCBOExecuted());
}
}
// Type checking and implicit type conversion for join keys
@@ -10999,12 +10999,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
throws SemanticException {
// Since the user didn't supply a customized type-checking context,
// use default settings.
- return genExprNodeDesc(expr, input, true);
+ return genExprNodeDesc(expr, input, true, false);
}
public ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input, boolean useCaching)
throws SemanticException {
- TypeCheckCtx tcCtx = new TypeCheckCtx(input, useCaching);
+ return genExprNodeDesc(expr, input, useCaching, false);
+ }
+
+ public ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input, boolean useCaching,
+ boolean foldExpr) throws SemanticException {
+ TypeCheckCtx tcCtx = new TypeCheckCtx(input, useCaching, foldExpr);
return genExprNodeDesc(expr, input, tcCtx);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/da82819b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
index de1c043..02896ff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
@@ -37,6 +37,8 @@ public class TypeCheckCtx implements NodeProcessorCtx {
private final boolean useCaching;
+ private final boolean foldExpr;
+
/**
* Receives translations which will need to be applied during unparse.
*/
@@ -79,20 +81,21 @@ public class TypeCheckCtx implements NodeProcessorCtx {
* The input row resolver of the previous operator.
*/
public TypeCheckCtx(RowResolver inputRR) {
- this(inputRR, true);
+ this(inputRR, true, false);
}
- public TypeCheckCtx(RowResolver inputRR, boolean useCaching) {
- this(inputRR, useCaching, false, true, true, true, true, true, true, true);
+ public TypeCheckCtx(RowResolver inputRR, boolean useCaching, boolean foldExpr) {
+ this(inputRR, useCaching, foldExpr, false, true, true, true, true, true, true, true);
}
- public TypeCheckCtx(RowResolver inputRR, boolean useCaching, boolean allowStatefulFunctions,
- boolean allowDistinctFunctions, boolean allowGBExprElimination, boolean allowAllColRef,
- boolean allowFunctionStar, boolean allowWindowing,
+ public TypeCheckCtx(RowResolver inputRR, boolean useCaching, boolean foldExpr,
+ boolean allowStatefulFunctions, boolean allowDistinctFunctions, boolean allowGBExprElimination,
+ boolean allowAllColRef, boolean allowFunctionStar, boolean allowWindowing,
boolean allowIndexExpr, boolean allowSubQueryExpr) {
setInputRR(inputRR);
error = null;
this.useCaching = useCaching;
+ this.foldExpr = foldExpr;
this.allowStatefulFunctions = allowStatefulFunctions;
this.allowDistinctFunctions = allowDistinctFunctions;
this.allowGBExprElimination = allowGBExprElimination;
@@ -209,4 +212,8 @@ public class TypeCheckCtx implements NodeProcessorCtx {
public boolean isUseCaching() {
return useCaching;
}
+
+ public boolean isFoldExpr() {
+ return foldExpr;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/da82819b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
index da236d5..ceeb9b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
@@ -61,9 +61,12 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.udf.SettableUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFNvl;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFWhen;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -1055,6 +1058,14 @@ public class TypeCheckProcFactory {
}
desc = ExprNodeGenericFuncDesc.newInstance(genericUDF, funcText,
childrenList);
+ } else if (ctx.isFoldExpr() && canConvertIntoNvl(genericUDF, children)) {
+ // Rewrite CASE into NVL
+ desc = ExprNodeGenericFuncDesc.newInstance(new GenericUDFNvl(),
+ Lists.newArrayList(children.get(0), new ExprNodeConstantDesc(false)));
+ if (Boolean.FALSE.equals(((ExprNodeConstantDesc) children.get(1)).getValue())) {
+ desc = ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPNot(),
+ Lists.newArrayList(desc));
+ }
} else {
desc = ExprNodeGenericFuncDesc.newInstance(genericUDF, funcText,
children);
@@ -1072,6 +1083,21 @@ public class TypeCheckProcFactory {
return desc;
}
+ private boolean canConvertIntoNvl(GenericUDF genericUDF, ArrayList<ExprNodeDesc> children) {
+ if (genericUDF instanceof GenericUDFWhen && children.size() == 3 &&
+ children.get(1) instanceof ExprNodeConstantDesc &&
+ children.get(2) instanceof ExprNodeConstantDesc) {
+ ExprNodeConstantDesc constThen = (ExprNodeConstantDesc) children.get(1);
+ ExprNodeConstantDesc constElse = (ExprNodeConstantDesc) children.get(2);
+ Object thenVal = constThen.getValue();
+ Object elseVal = constElse.getValue();
+ if (thenVal instanceof Boolean && elseVal instanceof Boolean) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Returns true if des is a descendant of ans (ancestor)
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/da82819b/ql/src/test/queries/clientpositive/constantPropWhen.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/constantPropWhen.q b/ql/src/test/queries/clientpositive/constantPropWhen.q
index c1d4885..03bfd54 100644
--- a/ql/src/test/queries/clientpositive/constantPropWhen.q
+++ b/ql/src/test/queries/clientpositive/constantPropWhen.q
@@ -1,4 +1,5 @@
set hive.mapred.mode=nonstrict;
+set hive.optimize.constant.propagation=false;
drop table test_1;
@@ -24,6 +25,7 @@ SELECT cast(CASE id when id2 THEN TRUE ELSE FALSE END AS BOOLEAN) AS b FROM test
set hive.cbo.enable=false;
+set hive.optimize.constant.propagation=true;
explain SELECT cast(CASE WHEN id = id2 THEN FALSE ELSE TRUE END AS BOOLEAN) AS b FROM test_1;
[12/13] hive git commit: HIVE-13507: Improved logging for ptest
(Siddharth Seth, reviewd by Szehon Ho)
Posted by jd...@apache.org.
HIVE-13507: Improved logging for ptest (Siddharth Seth, reviewd by Szehon Ho)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3f07bfce
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3f07bfce
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3f07bfce
Branch: refs/heads/llap
Commit: 3f07bfcefce775dc77eca13cf623ccde94ff2494
Parents: 2b1e273
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 6 10:06:25 2016 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Fri May 6 10:06:25 2016 -0500
----------------------------------------------------------------------
.../hive/ptest/execution/ExecutionPhase.java | 2 +
.../hive/ptest/execution/HostExecutor.java | 48 ++++++++++++++++++--
.../hive/ptest/execution/LocalCommand.java | 31 +++++++++++--
.../apache/hive/ptest/execution/PrepPhase.java | 1 +
.../apache/hive/ptest/execution/conf/Host.java | 3 ++
5 files changed, 76 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3f07bfce/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java
----------------------------------------------------------------------
diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java
index 3026ea0..6063afc 100644
--- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java
+++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java
@@ -86,6 +86,8 @@ public class ExecutionPhase extends Phase {
isolatedWorkQueue.add(batch);
}
}
+ logger.info("ParallelWorkQueueSize={}, IsolatedWorkQueueSize={}", parallelWorkQueue.size(),
+ isolatedWorkQueue.size());
try {
int expectedNumHosts = hostExecutors.size();
initalizeHosts();
http://git-wip-us.apache.org/repos/asf/hive/blob/3f07bfce/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java
----------------------------------------------------------------------
diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java
index b05d2c2..735b261 100644
--- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java
+++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Stopwatch;
import org.apache.hive.ptest.execution.conf.Host;
import org.apache.hive.ptest.execution.conf.TestBatch;
import org.apache.hive.ptest.execution.ssh.RSyncCommand;
@@ -65,6 +66,8 @@ class HostExecutor {
private final File mFailedTestLogDir;
private final long mNumPollSeconds;
private volatile boolean mShutdown;
+ private int numParallelBatchesProcessed = 0;
+ private int numIsolatedBatchesProcessed = 0;
HostExecutor(Host host, String privateKey, ListeningExecutorService executor,
SSHCommandExecutor sshCommandExecutor,
@@ -100,7 +103,18 @@ class HostExecutor {
return mExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
- executeTests(parallelWorkQueue, isolatedWorkQueue, failedTestResults);
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ mLogger.info("Starting SubmitTests on host {}", getHost());
+ try {
+ executeTests(parallelWorkQueue, isolatedWorkQueue, failedTestResults);
+ } finally {
+ stopwatch.stop();
+ mLogger.info("Finishing submitTests on host: {}. ElapsedTime(seconds)={}," +
+ " NumParallelBatchesProcessed={}, NumIsolatedBatchesProcessed={}",
+ new Object[]{getHost().toString(),
+ stopwatch.elapsed(TimeUnit.SECONDS), numParallelBatchesProcessed,
+ numIsolatedBatchesProcessed});
+ }
return null;
}
@@ -143,6 +157,7 @@ class HostExecutor {
@Override
public Void call() throws Exception {
TestBatch batch = null;
+ Stopwatch sw = Stopwatch.createUnstarted();
try {
do {
batch = parallelWorkQueue.poll(mNumPollSeconds, TimeUnit.SECONDS);
@@ -151,8 +166,16 @@ class HostExecutor {
return null;
}
if(batch != null) {
- if(!executeTestBatch(drone, batch, failedTestResults)) {
- failedTestResults.add(batch);
+ numParallelBatchesProcessed++;
+ sw.reset().start();
+ try {
+ if (!executeTestBatch(drone, batch, failedTestResults)) {
+ failedTestResults.add(batch);
+ }
+ } finally {
+ sw.stop();
+ mLogger.info("Finished processing parallel batch [{}] on host {}. ElapsedTime(seconds)={}",
+ new Object[]{batch.getName(), getHost().toShortString(), sw.elapsed(TimeUnit.SECONDS)});
}
}
} while(!mShutdown && !parallelWorkQueue.isEmpty());
@@ -176,12 +199,22 @@ class HostExecutor {
mLogger.info("Starting isolated execution on " + mHost.getName());
for(Drone drone : ImmutableList.copyOf(mDrones)) {
TestBatch batch = null;
+ Stopwatch sw = Stopwatch.createUnstarted();
try {
do {
+
batch = isolatedWorkQueue.poll(mNumPollSeconds, TimeUnit.SECONDS);
if(batch != null) {
- if(!executeTestBatch(drone, batch, failedTestResults)) {
- failedTestResults.add(batch);
+ numIsolatedBatchesProcessed++;
+ sw.reset().start();
+ try {
+ if (!executeTestBatch(drone, batch, failedTestResults)) {
+ failedTestResults.add(batch);
+ }
+ } finally {
+ sw.stop();
+ mLogger.info("Finished processing isolated batch [{}] on host {}. ElapsedTime(seconds)={}",
+ new Object[]{batch.getName(), getHost().toShortString(), sw.elapsed(TimeUnit.SECONDS)});
}
}
} while(!mShutdown && !isolatedWorkQueue.isEmpty());
@@ -215,10 +248,15 @@ class HostExecutor {
Templates.writeTemplateResult("batch-exec.vm", script, templateVariables);
copyToDroneFromLocal(drone, script.getAbsolutePath(), "$localDir/$instanceName/scratch/" + scriptName);
script.delete();
+ Stopwatch sw = Stopwatch.createStarted();
mLogger.info(drone + " executing " + batch + " with " + command);
RemoteCommandResult sshResult = new SSHCommand(mSSHCommandExecutor, drone.getPrivateKey(), drone.getUser(),
drone.getHost(), drone.getInstance(), command, true).
call();
+ sw.stop();
+ mLogger.info("Completed executing tests for batch [{}] on host {}. ElapsedTime(seconds)={}",
+ new Object[]{batch.getName(),
+ getHost().toShortString(), sw.elapsed(TimeUnit.SECONDS)});
File batchLogDir = null;
if(sshResult.getExitCode() == Constants.EXIT_CODE_UNKNOWN) {
throw new AbortDroneException("Drone " + drone.toString() + " exited with " +
http://git-wip-us.apache.org/repos/asf/hive/blob/3f07bfce/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LocalCommand.java
----------------------------------------------------------------------
diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LocalCommand.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LocalCommand.java
index ec99656..de9fe68 100644
--- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LocalCommand.java
+++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LocalCommand.java
@@ -22,17 +22,28 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Stopwatch;
import org.slf4j.Logger;
public class LocalCommand {
+ private static final AtomicInteger localCommandCounter = new AtomicInteger(0);
+
+ private final Logger logger;
private final Process process;
private final StreamReader streamReader;
private Integer exitCode;
+ private final int commandId;
+ private final Stopwatch stopwatch = Stopwatch.createUnstarted();
public LocalCommand(Logger logger, OutputPolicy outputPolicy, String command) throws IOException {
- logger.info("Starting " + command);
+ this.commandId = localCommandCounter.incrementAndGet();
+ this.logger = logger;
+ logger.info("Starting LocalCommandId={}: {}" + commandId, command);
+ stopwatch.start();
process = new ProcessBuilder().command(new String[] {"bash", "-c", command}).redirectErrorStream(true).start();
streamReader = new StreamReader(outputPolicy, process.getInputStream());
streamReader.setName("StreamReader-[" + command + "]");
@@ -42,13 +53,25 @@ public class LocalCommand {
public int getExitCode() throws InterruptedException {
synchronized (process) {
- if(exitCode == null) {
+ awaitProcessCompletion();
+ return exitCode;
+ }
+ }
+
+ private void awaitProcessCompletion() throws InterruptedException {
+ synchronized (process) {
+ if (exitCode == null) {
exitCode = process.waitFor();
+ if (stopwatch.isRunning()) {
+ stopwatch.stop();
+ logger.info("Finished LocalCommandId={}. ElapsedTime(seconds)={}", commandId,
+ stopwatch.elapsed(
+ TimeUnit.SECONDS));
+ }
}
- return exitCode;
}
}
-
+
public void kill() {
synchronized (process) {
process.destroy();
http://git-wip-us.apache.org/repos/asf/hive/blob/3f07bfce/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PrepPhase.java
----------------------------------------------------------------------
diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PrepPhase.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PrepPhase.java
index 825f0c0..8fef413 100644
--- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PrepPhase.java
+++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PrepPhase.java
@@ -62,6 +62,7 @@ public class PrepPhase extends Phase {
// source prep
start = System.currentTimeMillis();
File sourcePrepScript = new File(mScratchDir, "source-prep.sh");
+ logger.info("Writing {} from template", sourcePrepScript);
Templates.writeTemplateResult("source-prep.vm", sourcePrepScript, getTemplateDefaults());
execLocally("bash " + sourcePrepScript.getPath());
logger.debug("Deleting " + sourcePrepScript + ": " + sourcePrepScript.delete());
http://git-wip-us.apache.org/repos/asf/hive/blob/3f07bfce/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/Host.java
----------------------------------------------------------------------
diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/Host.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/Host.java
index c1216c1..a56824c 100644
--- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/Host.java
+++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/Host.java
@@ -47,6 +47,9 @@ public class Host {
public String[] getLocalDirectories() {
return localDirectories;
}
+ public String toShortString() {
+ return name;
+ }
@Override
public String toString() {
return "Host [name=" + name + ", user=" + user + ", threads=" + threads
[11/13] hive git commit: HIVE-13679: Pass diagnostic message to
failure hooks (Jimmy Xiang, reviewed by Aihua Xu)
Posted by jd...@apache.org.
HIVE-13679: Pass diagnostic message to failure hooks (Jimmy Xiang, reviewed by Aihua Xu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2b1e273e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2b1e273e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2b1e273e
Branch: refs/heads/llap
Commit: 2b1e273e44fe367c12167409e8552efa2770ae7e
Parents: b870d52
Author: Jimmy Xiang <jx...@apache.org>
Authored: Tue May 3 14:48:09 2016 -0700
Committer: Jimmy Xiang <jx...@apache.org>
Committed: Fri May 6 07:41:43 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Driver.java | 11 ++++++----
.../org/apache/hadoop/hive/ql/exec/Task.java | 21 ++++++++++++++++----
.../apache/hadoop/hive/ql/exec/TaskResult.java | 7 +++++--
.../apache/hadoop/hive/ql/exec/TaskRunner.java | 5 ++++-
.../hive/ql/exec/mr/HadoopJobExecHelper.java | 1 +
.../hadoop/hive/ql/exec/mr/JobDebugger.java | 18 +++++++++++------
6 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 6a610cb..3fecc5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -118,6 +118,7 @@ import org.apache.hive.common.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
@@ -1598,7 +1599,8 @@ public class Driver implements CommandProcessor {
} else {
setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
- invokeFailureHooks(perfLogger, hookContext, result.getTaskError());
+ invokeFailureHooks(perfLogger, hookContext,
+ errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError());
SQLState = "08S01";
console.printError(errorMessage);
driverCxt.shutdown();
@@ -1634,7 +1636,7 @@ public class Driver implements CommandProcessor {
if (driverCxt.isShutdown()) {
SQLState = "HY008";
errorMessage = "FAILED: Operation cancelled";
- invokeFailureHooks(perfLogger, hookContext, null);
+ invokeFailureHooks(perfLogger, hookContext, errorMessage, null);
console.printError(errorMessage);
return 1000;
}
@@ -1691,7 +1693,7 @@ public class Driver implements CommandProcessor {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
if (hookContext != null) {
try {
- invokeFailureHooks(perfLogger, hookContext, e);
+ invokeFailureHooks(perfLogger, hookContext, errorMessage, e);
} catch (Exception t) {
LOG.warn("Failed to invoke failure hook", t);
}
@@ -1790,7 +1792,8 @@ public class Driver implements CommandProcessor {
}
}
- private void invokeFailureHooks(PerfLogger perfLogger, HookContext hookContext, Throwable exception) throws Exception {
+ private void invokeFailureHooks(PerfLogger perfLogger,
+ HookContext hookContext, String errorMessage, Throwable exception) throws Exception {
hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
hookContext.setErrorMessage(errorMessage);
hookContext.setException(exception);
http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 34bdafd..eeaa543 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -27,10 +27,12 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.*;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryDisplay;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -40,6 +42,8 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Task implementation.
@@ -84,8 +88,17 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
protected T work;
private TaskState taskState = TaskState.CREATED;
private String statusMessage;
+ private String diagnosticMesg;
private transient boolean fetchSource;
+ public void setDiagnosticMessage(String diagnosticMesg) {
+ this.diagnosticMesg = diagnosticMesg;
+ }
+
+ public String getDiagnosticsMessage() {
+ return diagnosticMesg;
+ }
+
public void setStatusMessage(String statusMessage) {
this.statusMessage = statusMessage;
updateStatusInQueryDisplay();
@@ -321,7 +334,7 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
return ret;
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "rawtypes"})
public static List<Task<? extends Serializable>>
findLeafs(List<Task<? extends Serializable>> rootTasks) {
final List<Task<? extends Serializable>> leafTasks = new ArrayList<Task<?>>();
http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java
index def9389..3c4ee17 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java
@@ -37,10 +37,13 @@ public class TaskResult {
this.exitVal = exitVal;
setRunning(false);
}
- public void setExitVal(int exitVal, Throwable taskError) {
- this.setExitVal(exitVal);
+ public void setTaskError(Throwable taskError) {
this.taskError = taskError;
}
+ public void setExitVal(int exitVal, Throwable taskError) {
+ setExitVal(exitVal);
+ setTaskError(taskError);
+ }
public int getExitVal() {
return exitVal;
http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
index 81f6db0..a596e92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
@@ -104,7 +104,10 @@ public class TaskRunner extends Thread {
}
LOG.error("Error in executeTask", t);
}
- result.setExitVal(exitVal, tsk.getException());
+ result.setExitVal(exitVal);
+ if (tsk.getException() != null) {
+ result.setTaskError(tsk.getException());
+ }
}
public static long getTaskRunnerID () {
http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index 11f5cfd..c15316bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -574,6 +574,7 @@ public class HadoopJobExecHelper {
Thread t = new Thread(jd);
t.start();
t.join(HiveConf.getIntVar(job, HiveConf.ConfVars.JOB_DEBUG_TIMEOUT));
+ task.setDiagnosticMessage(jd.getDiagnosticMesg());
int ec = jd.getErrorCode();
if (ec > 0) {
returnVal = ec;
http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java
index 6e4e3bf..d320536 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java
@@ -53,6 +53,7 @@ public class JobDebugger implements Runnable {
private final Map<String, Integer> failures = new HashMap<String, Integer>();
private final Set<String> successes = new HashSet<String>(); // Successful task ID's
private final Map<String, TaskInfo> taskIdToInfo = new HashMap<String, TaskInfo>();
+ private String diagnosticMesg;
private int maxFailures = 0;
// Used for showJobFailDebugInfo
@@ -115,7 +116,7 @@ public class JobDebugger implements Runnable {
public void run() {
try {
- showJobFailDebugInfo();
+ diagnosticMesg = showJobFailDebugInfo();
} catch (IOException e) {
console.printError(e.getMessage());
}
@@ -216,8 +217,7 @@ public class JobDebugger implements Runnable {
}
}
- @SuppressWarnings("deprecation")
- private void showJobFailDebugInfo() throws IOException {
+ private String showJobFailDebugInfo() throws IOException {
console.printError("Error during job, obtaining debugging information...");
if (!conf.get("mapred.job.tracker", "local").equals("local")) {
// Show Tracking URL for remotely running jobs.
@@ -241,7 +241,7 @@ public class JobDebugger implements Runnable {
}
if (failures.keySet().size() == 0) {
- return;
+ return null;
}
// Find the highest failure count
computeMaxFailures() ;
@@ -255,6 +255,7 @@ public class JobDebugger implements Runnable {
+ e.getMessage());
}
+ String msg = null;
for (String task : failures.keySet()) {
if (failures.get(task).intValue() == maxFailures) {
TaskInfo ti = taskIdToInfo.get(task);
@@ -303,14 +304,19 @@ public class JobDebugger implements Runnable {
for (String mesg : diagMesgs) {
sb.append(mesg + "\n");
}
- console.printError(sb.toString());
+ msg = sb.toString();
+ console.printError(msg);
}
// Only print out one task because that's good enough for debugging.
break;
}
}
- return;
+ return msg;
+ }
+
+ public String getDiagnosticMesg() {
+ return diagnosticMesg;
}
public int getErrorCode() {
[06/13] hive git commit: HIVE-13393: Beeline: Print help message for
the --incremental option (Vaibhav Gumashta reviewed by Thejas Nair)
Posted by jd...@apache.org.
HIVE-13393: Beeline: Print help message for the --incremental option (Vaibhav Gumashta reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/794f161c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/794f161c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/794f161c
Branch: refs/heads/llap
Commit: 794f161c136c4d99693eb60222c0f17b10948e0d
Parents: 4eb9603
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Thu May 5 15:12:38 2016 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Thu May 5 15:12:38 2016 -0700
----------------------------------------------------------------------
beeline/src/main/resources/BeeLine.properties | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/794f161c/beeline/src/main/resources/BeeLine.properties
----------------------------------------------------------------------
diff --git a/beeline/src/main/resources/BeeLine.properties b/beeline/src/main/resources/BeeLine.properties
index a118c09..bc40685 100644
--- a/beeline/src/main/resources/BeeLine.properties
+++ b/beeline/src/main/resources/BeeLine.properties
@@ -171,7 +171,14 @@ cmd-usage: Usage: java org.apache.hive.cli.beeline.BeeLine \n \
\ --silent=[true/false] be more silent\n \
\ --autosave=[true/false] automatically save preferences\n \
\ --outputformat=[table/vertical/csv2/tsv2/dsv/csv/tsv] format mode for result display\n \
-\ Note that csv, and tsv are deprecated - use csv2, tsv2 instead\n\
+\ Note that csv, and tsv are deprecated - use csv2, tsv2 instead\n \
+\ --incremental=[true/false] Defaults to false. When set to false, the entire result set\n \
+\ is fetched and buffered before being displayed, yielding optimal\n \
+\ display column sizing. When set to true, result rows are displayed\n \
+\ immediately as they are fetched, yielding lower latency and\n \
+\ memory usage at the price of extra display column padding.\n \
+\ Setting --incremental=true is recommended if you encounter an OutOfMemory\n \
+\ on the client side (due to the fetched result set size being large).\n \
\ --truncateTable=[true/false] truncate table column when it exceeds length\n \
\ --delimiterForDSV=DELIMITER specify the delimiter for delimiter-separated values output format (default: |)\n \
\ --isolation=LEVEL set the transaction isolation level\n \
[13/13] hive git commit: Merge branch 'master' into llap
Posted by jd...@apache.org.
Merge branch 'master' into llap
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f089f2e6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f089f2e6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f089f2e6
Branch: refs/heads/llap
Commit: f089f2e64241592ecf8144d044bec8a0659ff422
Parents: 89ec219 3f07bfc
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri May 6 10:14:21 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Fri May 6 10:14:21 2016 -0700
----------------------------------------------------------------------
beeline/src/main/resources/BeeLine.properties | 9 +-
cli/pom.xml | 6 +
common/pom.xml | 6 +
.../org/apache/hadoop/hive/common/LogUtils.java | 35 +-
.../org/apache/hadoop/hive/conf/HiveConf.java | 8 +
.../src/main/resources/hive-log4j2.properties | 2 +-
.../hadoop/hive/conf/TestHiveAsyncLogging.java | 49 ++
data/conf/hive-log4j2.properties | 2 +-
hcatalog/core/pom.xml | 6 +
.../hive/metastore/TestHiveMetaStoreTxns.java | 2 +-
llap-server/bin/llapDaemon.sh | 2 +-
.../hadoop/hive/llap/cli/LlapServiceDriver.java | 21 +-
.../hive/llap/daemon/impl/LlapDaemon.java | 10 +-
...doop-metrics2-llapdaemon.properties.template | 50 ++
...trics2-llaptaskscheduler.properties.template | 50 ++
.../hadoop-metrics2.properties.template | 50 --
.../main/resources/llap-cli-log4j2.properties | 2 +-
.../resources/llap-daemon-log4j2.properties | 4 +-
llap-server/src/main/resources/package.py | 6 +-
.../resources/llap-daemon-log4j2.properties | 4 +-
.../tezplugins/LlapTaskSchedulerService.java | 2 +-
.../metrics/LlapTaskSchedulerMetrics.java | 6 +-
metastore/pom.xml | 6 +
.../upgrade/derby/035-HIVE-13395.derby.sql | 11 +
.../upgrade/derby/hive-schema-2.1.0.derby.sql | 2 +-
.../derby/hive-txn-schema-1.3.0.derby.sql | 11 +-
.../derby/hive-txn-schema-2.1.0.derby.sql | 130 ++++
.../derby/upgrade-1.2.0-to-1.3.0.derby.sql | 1 +
.../derby/upgrade-2.0.0-to-2.1.0.derby.sql | 1 +
.../upgrade/mssql/020-HIVE-13395.mssql.sql | 9 +
.../upgrade/mssql/hive-schema-1.3.0.mssql.sql | 12 +-
.../upgrade/mssql/hive-schema-2.1.0.mssql.sql | 12 +-
.../mssql/upgrade-1.2.0-to-1.3.0.mssql.sql | 1 +
.../mssql/upgrade-2.0.0-to-2.1.0.mssql.sql | 1 +
.../upgrade/mysql/035-HIVE-13395.mysql.sql | 10 +
.../upgrade/mysql/hive-schema-2.1.0.mysql.sql | 2 +-
.../mysql/hive-txn-schema-1.3.0.mysql.sql | 10 +
.../mysql/hive-txn-schema-2.1.0.mysql.sql | 131 ++++
.../mysql/upgrade-1.2.0-to-1.3.0.mysql.sql | 1 +
.../mysql/upgrade-2.0.0-to-2.1.0.mysql.sql | 1 +
.../upgrade/oracle/035-HIVE-13395.oracle.sql | 10 +
.../upgrade/oracle/hive-schema-2.1.0.oracle.sql | 2 +-
.../oracle/hive-txn-schema-1.3.0.oracle.sql | 12 +-
.../oracle/hive-txn-schema-2.1.0.oracle.sql | 129 ++++
.../oracle/upgrade-1.2.0-to-1.3.0.oracle.sql | 1 +
.../oracle/upgrade-2.0.0-to-2.1.0.oracle.sql | 1 +
.../postgres/034-HIVE-13395.postgres.sql | 10 +
.../postgres/hive-schema-2.1.0.postgres.sql | 2 +-
.../postgres/hive-txn-schema-1.3.0.postgres.sql | 11 +-
.../postgres/hive-txn-schema-2.1.0.postgres.sql | 129 ++++
.../upgrade-1.2.0-to-1.3.0.postgres.sql | 1 +
.../upgrade-2.0.0-to-2.1.0.postgres.sql | 1 +
.../hadoop/hive/metastore/HiveMetaStore.java | 1 +
.../hadoop/hive/metastore/txn/TxnDbUtil.java | 130 ++--
.../hadoop/hive/metastore/txn/TxnHandler.java | 466 +++++++++++---
.../hadoop/hive/metastore/txn/TxnStore.java | 8 +-
.../hadoop/hive/metastore/txn/TxnUtils.java | 2 +
.../metastore/txn/TestCompactionTxnHandler.java | 6 +-
.../hive/metastore/txn/TestTxnHandler.java | 29 +-
pom.xml | 2 +
ql/pom.xml | 6 +
.../java/org/apache/hadoop/hive/ql/Driver.java | 11 +-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 2 +-
.../hadoop/hive/ql/exec/OperatorUtils.java | 2 +-
.../org/apache/hadoop/hive/ql/exec/Task.java | 21 +-
.../apache/hadoop/hive/ql/exec/TaskResult.java | 7 +-
.../apache/hadoop/hive/ql/exec/TaskRunner.java | 5 +-
.../hive/ql/exec/mr/HadoopJobExecHelper.java | 1 +
.../hadoop/hive/ql/exec/mr/JobDebugger.java | 18 +-
.../hadoop/hive/ql/lockmgr/DbLockManager.java | 5 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 27 +-
.../calcite/translator/JoinTypeCheckCtx.java | 2 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 17 +-
.../hadoop/hive/ql/parse/TypeCheckCtx.java | 19 +-
.../hive/ql/parse/TypeCheckProcFactory.java | 26 +
.../hadoop/hive/ql/txn/AcidWriteSetService.java | 61 ++
.../txn/compactor/HouseKeeperServiceBase.java | 2 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 2 +-
.../hadoop/hive/ql/txn/compactor/Worker.java | 2 +-
.../main/resources/hive-exec-log4j2.properties | 2 +-
.../resources/tez-container-log4j2.properties | 2 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 2 +-
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 20 +
.../hive/ql/lockmgr/TestDbTxnManager.java | 7 +
.../hive/ql/lockmgr/TestDbTxnManager2.java | 610 ++++++++++++++++++-
.../hive/ql/txn/compactor/TestCleaner.java | 4 +
.../queries/clientpositive/constantPropWhen.q | 2 +
.../hive/ptest/execution/ExecutionPhase.java | 2 +
.../hive/ptest/execution/HostExecutor.java | 48 +-
.../hive/ptest/execution/LocalCommand.java | 31 +-
.../apache/hive/ptest/execution/PrepPhase.java | 1 +
.../apache/hive/ptest/execution/conf/Host.java | 3 +
92 files changed, 2294 insertions(+), 313 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f089f2e6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f089f2e6/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f089f2e6/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f089f2e6/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f089f2e6/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
----------------------------------------------------------------------
[10/13] hive git commit: HIVE-13027: Configuration changes to improve
logging performance (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Posted by jd...@apache.org.
HIVE-13027: Configuration changes to improve logging performance (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b870d526
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b870d526
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b870d526
Branch: refs/heads/llap
Commit: b870d526edbac1831d66f2529cf1a854b57bddb2
Parents: 0cc4045
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri May 6 03:08:28 2016 -0500
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri May 6 03:09:40 2016 -0500
----------------------------------------------------------------------
cli/pom.xml | 6 +++
common/pom.xml | 6 +++
.../org/apache/hadoop/hive/common/LogUtils.java | 35 ++++++++++++--
.../org/apache/hadoop/hive/conf/HiveConf.java | 6 +++
.../src/main/resources/hive-log4j2.properties | 2 +-
.../hadoop/hive/conf/TestHiveAsyncLogging.java | 49 ++++++++++++++++++++
data/conf/hive-log4j2.properties | 2 +-
hcatalog/core/pom.xml | 6 +++
llap-server/bin/llapDaemon.sh | 2 +-
.../hive/llap/daemon/impl/LlapDaemon.java | 10 ++--
.../main/resources/llap-cli-log4j2.properties | 2 +-
.../resources/llap-daemon-log4j2.properties | 4 +-
.../resources/llap-daemon-log4j2.properties | 4 +-
metastore/pom.xml | 6 +++
pom.xml | 2 +
ql/pom.xml | 6 +++
.../main/resources/hive-exec-log4j2.properties | 2 +-
.../resources/tez-container-log4j2.properties | 2 +-
18 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/cli/pom.xml
----------------------------------------------------------------------
diff --git a/cli/pom.xml b/cli/pom.xml
index 76f6d11..6f2e664 100644
--- a/cli/pom.xml
+++ b/cli/pom.xml
@@ -82,6 +82,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disruptor.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>${jline.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 67aab7c..9933072 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -164,6 +164,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disruptor.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>${json.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
index adcf805..599e798 100644
--- a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
@@ -21,12 +21,18 @@ package org.apache.hadoop.hive.common;
import java.io.File;
import java.net.URL;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.impl.Log4jContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Utilities common to logging operations.
*/
@@ -66,8 +72,14 @@ public class LogUtils {
}
private static String initHiveLog4jCommon(ConfVars confVarName)
- throws LogInitializationException {
+ throws LogInitializationException {
HiveConf conf = new HiveConf();
+ return initHiveLog4jCommon(conf, confVarName);
+ }
+
+ @VisibleForTesting
+ public static String initHiveLog4jCommon(HiveConf conf, ConfVars confVarName)
+ throws LogInitializationException {
if (HiveConf.getVar(conf, confVarName).equals("")) {
// if log4j configuration file not set, or could not found, use default setting
return initHiveLog4jDefault(conf, "", confVarName);
@@ -91,13 +103,28 @@ public class LogUtils {
}
System.setProperty(HiveConf.ConfVars.HIVEQUERYID.toString(), queryId);
}
+ final boolean async = checkAndSetAsyncLogging(conf);
Configurator.initialize(null, log4jFileName);
logConfigLocation(conf);
- return ("Logging initialized using configuration in " + log4jConfigFile);
+ return "Logging initialized using configuration in " + log4jConfigFile + " Async: " + async;
}
}
}
+ public static boolean checkAndSetAsyncLogging(final Configuration conf) {
+ final boolean asyncLogging = HiveConf.getBoolVar(conf, ConfVars.HIVE_ASYNC_LOG_ENABLED);
+ if (asyncLogging) {
+ System.setProperty("Log4jContextSelector",
+ "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
+ // default is ClassLoaderContextSelector which is created during automatic logging
+ // initialization in a static initialization block.
+ // Changing ContextSelector at runtime requires creating new context factory which will
+ // internally create new context selector based on system property.
+ LogManager.setFactory(new Log4jContextFactory());
+ }
+ return asyncLogging;
+ }
+
private static String initHiveLog4jDefault(
HiveConf conf, String logMessage, ConfVars confVarName)
throws LogInitializationException {
@@ -118,9 +145,11 @@ public class LogUtils {
break;
}
if (hive_l4j != null) {
+ final boolean async = checkAndSetAsyncLogging(conf);
Configurator.initialize(null, hive_l4j.toString());
logConfigLocation(conf);
- return (logMessage + "\n" + "Logging initialized using configuration in " + hive_l4j);
+ return (logMessage + "\n" + "Logging initialized using configuration in " + hive_l4j +
+ " Async: " + async);
} else {
throw new LogInitializationException(
logMessage + "Unable to initialize logging using "
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bb74d99..07dff08 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1955,6 +1955,12 @@ public class HiveConf extends Configuration {
"If the property is not set, then logging will be initialized using hive-exec-log4j2.properties found on the classpath.\n" +
"If the property is set, the value must be a valid URI (java.net.URI, e.g. \"file:///tmp/my-logging.xml\"), \n" +
"which you can then extract a URL from and pass to PropertyConfigurator.configure(URL)."),
+ HIVE_ASYNC_LOG_ENABLED("hive.async.log.enabled", true,
+ "Whether to enable Log4j2's asynchronous logging. Asynchronous logging can give\n" +
+ " significant performance improvement as logging will be handled in separate thread\n" +
+ " that uses LMAX disruptor queue for buffering log messages.\n" +
+ " Refer https://logging.apache.org/log4j/2.x/manual/async.html for benefits and\n" +
+ " drawbacks."),
HIVE_LOG_EXPLAIN_OUTPUT("hive.log.explain.output", false,
"Whether to log explain output for every query.\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/common/src/main/resources/hive-log4j2.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/hive-log4j2.properties b/common/src/main/resources/hive-log4j2.properties
index cf0369a..2f67be8 100644
--- a/common/src/main/resources/hive-log4j2.properties
+++ b/common/src/main/resources/hive-log4j2.properties
@@ -36,7 +36,7 @@ appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n
# daily rolling file appender
-appender.DRFA.type = RollingFile
+appender.DRFA.type = RollingRandomAccessFile
appender.DRFA.name = DRFA
appender.DRFA.fileName = ${sys:hive.log.dir}/${sys:hive.log.file}
# Use %pid in the filePattern to append <process-id>@<host-name> to the filename if you want separate log files for different CLI session
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/common/src/test/org/apache/hadoop/hive/conf/TestHiveAsyncLogging.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/conf/TestHiveAsyncLogging.java b/common/src/test/org/apache/hadoop/hive/conf/TestHiveAsyncLogging.java
new file mode 100644
index 0000000..e2631cf
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/conf/TestHiveAsyncLogging.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.conf;
+
+import org.apache.hadoop.hive.common.LogUtils;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.async.AsyncLoggerContextSelector;
+import org.apache.logging.log4j.core.impl.Log4jContextFactory;
+import org.apache.logging.log4j.core.selector.ClassLoaderContextSelector;
+import org.apache.logging.log4j.core.selector.ContextSelector;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestHiveAsyncLogging extends TestCase {
+
+ // this test requires disruptor jar in classpath
+ @Test
+ public void testAsyncLoggingInitialization() throws Exception {
+ HiveConf conf = new HiveConf();
+ conf.setBoolVar(ConfVars.HIVE_ASYNC_LOG_ENABLED, false);
+ LogUtils.initHiveLog4jCommon(conf, ConfVars.HIVE_LOG4J_FILE);
+ Log4jContextFactory log4jContextFactory = (Log4jContextFactory) LogManager.getFactory();
+ ContextSelector contextSelector = log4jContextFactory.getSelector();
+ assertTrue(contextSelector instanceof ClassLoaderContextSelector);
+
+ conf.setBoolVar(ConfVars.HIVE_ASYNC_LOG_ENABLED, true);
+ LogUtils.initHiveLog4jCommon(conf, ConfVars.HIVE_LOG4J_FILE);
+ log4jContextFactory = (Log4jContextFactory) LogManager.getFactory();
+ contextSelector = log4jContextFactory.getSelector();
+ assertTrue(contextSelector instanceof AsyncLoggerContextSelector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/data/conf/hive-log4j2.properties
----------------------------------------------------------------------
diff --git a/data/conf/hive-log4j2.properties b/data/conf/hive-log4j2.properties
index 6bace1f..f60d5be 100644
--- a/data/conf/hive-log4j2.properties
+++ b/data/conf/hive-log4j2.properties
@@ -35,7 +35,7 @@ appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n
# daily rolling file appender
-appender.DRFA.type = RollingFile
+appender.DRFA.type = RollingRandomAccessFile
appender.DRFA.name = DRFA
appender.DRFA.fileName = ${sys:hive.log.dir}/${sys:hive.log.file}
appender.DRFA.filePattern = ${sys:hive.log.dir}/${sys:hive.log.file}.%d{yyyy-MM-dd}
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/hcatalog/core/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/core/pom.xml b/hcatalog/core/pom.xml
index 1e970bf..c9a6c01 100644
--- a/hcatalog/core/pom.xml
+++ b/hcatalog/core/pom.xml
@@ -131,6 +131,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disruptor.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/llap-server/bin/llapDaemon.sh
----------------------------------------------------------------------
diff --git a/llap-server/bin/llapDaemon.sh b/llap-server/bin/llapDaemon.sh
index 6f57998..566bbc8 100755
--- a/llap-server/bin/llapDaemon.sh
+++ b/llap-server/bin/llapDaemon.sh
@@ -113,7 +113,7 @@ case $startStop in
#rotate_log $logOut
echo starting llapdaemon, logging to $logLog and $logOut
export LLAP_DAEMON_LOGFILE=${LLAP_DAEMON_LOG_BASE}.log
- nohup nice -n $LLAP_DAEMON_NICENESS "$LLAP_DAEMON_BIN_HOME"/runLlapDaemon.sh run > "$logOut" 2>&1 < /dev/null &
+ nohup nice -n $LLAP_DAEMON_NICENESS "$LLAP_DAEMON_BIN_HOME"/runLlapDaemon.sh run >> "$logOut" 2>&1 < /dev/null &
echo $! > $pid
;;
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index e662de9..2e07a8c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -31,6 +31,7 @@ import java.util.regex.Pattern;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.DaemonId;
@@ -119,7 +120,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
int mngPort, int shufflePort, int webPort, String appName) {
super("LlapDaemon");
- initializeLogging();
+ initializeLogging(daemonConf);
+
printAsciiArt();
Preconditions.checkArgument(numExecutors > 0);
@@ -264,13 +266,15 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
addIfService(amReporter);
}
- private void initializeLogging() {
+ private void initializeLogging(final Configuration conf) {
long start = System.currentTimeMillis();
URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(LOG4j2_PROPERTIES_FILE);
if (llap_l4j2 != null) {
+ final boolean async = LogUtils.checkAndSetAsyncLogging(conf);
Configurator.initialize("LlapDaemonLog4j2", llap_l4j2.toString());
long end = System.currentTimeMillis();
- LOG.warn("LLAP daemon logging initialized from {} in {} ms", llap_l4j2, (end - start));
+ LOG.warn("LLAP daemon logging initialized from {} in {} ms. Async: {}",
+ llap_l4j2, (end - start), async);
} else {
throw new RuntimeException("Log initialization failed." +
" Unable to locate " + LOG4j2_PROPERTIES_FILE + " file in classpath");
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/llap-server/src/main/resources/llap-cli-log4j2.properties
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/llap-cli-log4j2.properties b/llap-server/src/main/resources/llap-cli-log4j2.properties
index 2f27b5e..c6b8f20 100644
--- a/llap-server/src/main/resources/llap-cli-log4j2.properties
+++ b/llap-server/src/main/resources/llap-cli-log4j2.properties
@@ -36,7 +36,7 @@ appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %p %c{2}: %m%n
# daily rolling file appender
-appender.DRFA.type = RollingFile
+appender.DRFA.type = RollingRandomAccessFile
appender.DRFA.name = DRFA
appender.DRFA.fileName = ${sys:hive.log.dir}/${sys:hive.log.file}
# Use %pid in the filePattern to append <process-id>@<host-name> to the filename if you want separate log files for different CLI session
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/llap-server/src/main/resources/llap-daemon-log4j2.properties
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties
index 268eb59..c5166e3 100644
--- a/llap-server/src/main/resources/llap-daemon-log4j2.properties
+++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties
@@ -38,7 +38,7 @@ appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t%x] %p %c{2} : %m%n
# rolling file appender
-appender.RFA.type = RollingFile
+appender.RFA.type = RollingRandomAccessFile
appender.RFA.name = RFA
appender.RFA.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}
appender.RFA.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%i
@@ -51,7 +51,7 @@ appender.RFA.strategy.type = DefaultRolloverStrategy
appender.RFA.strategy.max = ${sys:llap.daemon.log.maxbackupindex}
# history file appender
-appender.HISTORYAPPENDER.type = RollingFile
+appender.HISTORYAPPENDER.type = RollingRandomAccessFile
appender.HISTORYAPPENDER.name = HISTORYAPPENDER
appender.HISTORYAPPENDER.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.historylog.file}
appender.HISTORYAPPENDER.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.historylog.file}_%i
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/llap-server/src/test/resources/llap-daemon-log4j2.properties
----------------------------------------------------------------------
diff --git a/llap-server/src/test/resources/llap-daemon-log4j2.properties b/llap-server/src/test/resources/llap-daemon-log4j2.properties
index 7b5f4ed..2714dbd 100644
--- a/llap-server/src/test/resources/llap-daemon-log4j2.properties
+++ b/llap-server/src/test/resources/llap-daemon-log4j2.properties
@@ -38,7 +38,7 @@ appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t%x] %p %c{2} : %m%n
# rolling file appender
-appender.RFA.type = RollingFile
+appender.RFA.type = RollingRandomAccessFile
appender.RFA.name = RFA
appender.RFA.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}
appender.RFA.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%i
@@ -51,7 +51,7 @@ appender.RFA.strategy.type = DefaultRolloverStrategy
appender.RFA.strategy.max = ${sys:llap.daemon.log.maxbackupindex}
# history file appender
-appender.HISTORYAPPENDER.type = RollingFile
+appender.HISTORYAPPENDER.type = RollingRandomAccessFile
appender.HISTORYAPPENDER.name = HISTORYAPPENDER
appender.HISTORYAPPENDER.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.historylog.file}
appender.HISTORYAPPENDER.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.historylog.file}_%i
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/metastore/pom.xml
----------------------------------------------------------------------
diff --git a/metastore/pom.xml b/metastore/pom.xml
index 8816829..3827a51 100644
--- a/metastore/pom.xml
+++ b/metastore/pom.xml
@@ -228,6 +228,12 @@
<version>${mockito-all.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disruptor.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dff2a72..bde6857 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,8 @@
<hadoop.version>2.6.0</hadoop.version>
<hadoop.bin.path>${basedir}/${hive.path.to.root}/testutils/hadoop</hadoop.bin.path>
<hbase.version>1.1.1</hbase.version>
+ <!-- required for logging test to avoid including hbase which pulls disruptor transitively -->
+ <disruptor.version>3.3.0</disruptor.version>
<!-- httpcomponents are not always in version sync -->
<httpcomponents.client.version>4.4</httpcomponents.client.version>
<httpcomponents.core.version>4.4</httpcomponents.core.version>
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index aaa3271..8b2d0e6 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -325,6 +325,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ <version>${disruptor.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<version>${groovy.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/ql/src/main/resources/hive-exec-log4j2.properties
----------------------------------------------------------------------
diff --git a/ql/src/main/resources/hive-exec-log4j2.properties b/ql/src/main/resources/hive-exec-log4j2.properties
index 4fba04c..21e24fd 100644
--- a/ql/src/main/resources/hive-exec-log4j2.properties
+++ b/ql/src/main/resources/hive-exec-log4j2.properties
@@ -36,7 +36,7 @@ appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n
# simple file appender
-appender.FA.type = File
+appender.FA.type = RandomAccessFile
appender.FA.name = FA
appender.FA.fileName = ${sys:hive.log.dir}/${sys:hive.log.file}
appender.FA.layout.type = PatternLayout
http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/ql/src/main/resources/tez-container-log4j2.properties
----------------------------------------------------------------------
diff --git a/ql/src/main/resources/tez-container-log4j2.properties b/ql/src/main/resources/tez-container-log4j2.properties
index 5d2b138..a048b17 100644
--- a/ql/src/main/resources/tez-container-log4j2.properties
+++ b/ql/src/main/resources/tez-container-log4j2.properties
@@ -28,7 +28,7 @@ property.tez.container.log.file = syslog
appenders = CLA
# daily rolling file appender
-appender.CLA.type = RollingFile
+appender.CLA.type = RollingRandomAccessFile
appender.CLA.name = CLA
appender.CLA.fileName = ${sys:tez.container.log.dir}/${sys:tez.container.log.file}
appender.CLA.filePattern = ${sys:tez.container.log.dir}/${sys:tez.container.log.file}.%d{yyyy-MM-dd}
[03/13] hive git commit: HIVE-13395 Lost Update problem in ACID
(Eugene Koifman, reviewed by Alan Gates)
Posted by jd...@apache.org.
HIVE-13395 Lost Update problem in ACID (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/10d05491
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/10d05491
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/10d05491
Branch: refs/heads/llap
Commit: 10d05491379bb6f8e607a030811e8d4e530604de
Parents: 0927187
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu May 5 12:45:44 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu May 5 12:45:44 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../hive/metastore/TestHiveMetaStoreTxns.java | 2 +-
.../upgrade/derby/035-HIVE-13395.derby.sql | 11 +
.../upgrade/derby/hive-schema-2.1.0.derby.sql | 2 +-
.../derby/hive-txn-schema-1.3.0.derby.sql | 11 +-
.../derby/hive-txn-schema-2.1.0.derby.sql | 130 ++++
.../derby/upgrade-1.2.0-to-1.3.0.derby.sql | 1 +
.../derby/upgrade-2.0.0-to-2.1.0.derby.sql | 1 +
.../upgrade/mssql/020-HIVE-13395.mssql.sql | 9 +
.../upgrade/mssql/hive-schema-1.3.0.mssql.sql | 12 +-
.../upgrade/mssql/hive-schema-2.1.0.mssql.sql | 12 +-
.../mssql/upgrade-1.2.0-to-1.3.0.mssql.sql | 1 +
.../mssql/upgrade-2.0.0-to-2.1.0.mssql.sql | 1 +
.../upgrade/mysql/035-HIVE-13395.mysql.sql | 10 +
.../upgrade/mysql/hive-schema-2.1.0.mysql.sql | 2 +-
.../mysql/hive-txn-schema-2.1.0.mysql.sql | 131 ++++
.../mysql/upgrade-1.2.0-to-1.3.0.mysql.sql | 1 +
.../mysql/upgrade-2.0.0-to-2.1.0.mysql.sql | 1 +
.../upgrade/oracle/035-HIVE-13395.oracle.sql | 10 +
.../upgrade/oracle/hive-schema-2.1.0.oracle.sql | 2 +-
.../oracle/hive-txn-schema-1.3.0.oracle.sql | 12 +-
.../oracle/hive-txn-schema-2.1.0.oracle.sql | 129 ++++
.../oracle/upgrade-1.2.0-to-1.3.0.oracle.sql | 1 +
.../oracle/upgrade-2.0.0-to-2.1.0.oracle.sql | 1 +
.../postgres/034-HIVE-13395.postgres.sql | 10 +
.../postgres/hive-schema-2.1.0.postgres.sql | 2 +-
.../postgres/hive-txn-schema-1.3.0.postgres.sql | 11 +-
.../postgres/hive-txn-schema-2.1.0.postgres.sql | 129 ++++
.../upgrade-1.2.0-to-1.3.0.postgres.sql | 1 +
.../upgrade-2.0.0-to-2.1.0.postgres.sql | 1 +
.../hadoop/hive/metastore/HiveMetaStore.java | 1 +
.../hadoop/hive/metastore/txn/TxnDbUtil.java | 130 ++--
.../hadoop/hive/metastore/txn/TxnHandler.java | 466 +++++++++++---
.../hadoop/hive/metastore/txn/TxnStore.java | 8 +-
.../hadoop/hive/metastore/txn/TxnUtils.java | 2 +
.../metastore/txn/TestCompactionTxnHandler.java | 6 +-
.../hive/metastore/txn/TestTxnHandler.java | 29 +-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 2 +-
.../hadoop/hive/ql/lockmgr/DbLockManager.java | 5 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 27 +-
.../hadoop/hive/ql/txn/AcidWriteSetService.java | 61 ++
.../txn/compactor/HouseKeeperServiceBase.java | 2 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 2 +-
.../hadoop/hive/ql/txn/compactor/Worker.java | 2 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 2 +-
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 20 +
.../hive/ql/lockmgr/TestDbTxnManager.java | 7 +
.../hive/ql/lockmgr/TestDbTxnManager2.java | 610 ++++++++++++++++++-
.../hive/ql/txn/compactor/TestCleaner.java | 4 +
49 files changed, 1843 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 06a6906..bb74d99 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1769,6 +1769,8 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"),
HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s",
new TimeValidator(TimeUnit.MILLISECONDS), "Time interval describing how often the reaper runs"),
+ WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s",
+ new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"),
// For HBase storage handler
HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
index e9ce789..22354ab 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
@@ -187,7 +187,7 @@ public class TestHiveMetaStoreTxns {
.setDbName("mydb")
.setTableName("mytable")
.setPartitionName("mypartition")
- .setExclusive()
+ .setSemiShared()
.build())
.addLockComponent(new LockComponentBuilder()
.setDbName("mydb")
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql b/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql
new file mode 100644
index 0000000..df33b95
--- /dev/null
+++ b/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql
@@ -0,0 +1,11 @@
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);
+
+
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql b/metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql
index 1d00499..dc27afc 100644
--- a/metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql
@@ -338,7 +338,7 @@ ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED
-- ----------------------------
-- Transaction and Lock Tables
-- ----------------------------
-RUN 'hive-txn-schema-2.0.0.derby.sql';
+RUN 'hive-txn-schema-2.1.0.derby.sql';
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql b/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
index 13f3340..480c19e 100644
--- a/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
@@ -32,7 +32,8 @@ CREATE TABLE TXN_COMPONENTS (
TC_TXNID bigint REFERENCES TXNS (TXN_ID),
TC_DATABASE varchar(128) NOT NULL,
TC_TABLE varchar(128),
- TC_PARTITION varchar(767)
+ TC_PARTITION varchar(767),
+ TC_OPERATION_TYPE char(1) NOT NULL
);
CREATE TABLE COMPLETED_TXN_COMPONENTS (
@@ -117,3 +118,11 @@ CREATE TABLE AUX_TABLE (
PRIMARY KEY(MT_KEY1, MT_KEY2)
);
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/hive-txn-schema-2.1.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-txn-schema-2.1.0.derby.sql b/metastore/scripts/upgrade/derby/hive-txn-schema-2.1.0.derby.sql
new file mode 100644
index 0000000..11d86ca
--- /dev/null
+++ b/metastore/scripts/upgrade/derby/hive-txn-schema-2.1.0.derby.sql
@@ -0,0 +1,130 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the License); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an AS IS BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+--
+-- Tables for transaction management
+--
+CREATE TABLE TXNS (
+ TXN_ID bigint PRIMARY KEY,
+ TXN_STATE char(1) NOT NULL,
+ TXN_STARTED bigint NOT NULL,
+ TXN_LAST_HEARTBEAT bigint NOT NULL,
+ TXN_USER varchar(128) NOT NULL,
+ TXN_HOST varchar(128) NOT NULL,
+ TXN_AGENT_INFO varchar(128),
+ TXN_META_INFO varchar(128),
+ TXN_HEARTBEAT_COUNT integer
+);
+
+CREATE TABLE TXN_COMPONENTS (
+ TC_TXNID bigint REFERENCES TXNS (TXN_ID),
+ TC_DATABASE varchar(128) NOT NULL,
+ TC_TABLE varchar(128),
+ TC_PARTITION varchar(767),
+ TC_OPERATION_TYPE char(1) NOT NULL
+);
+
+CREATE TABLE COMPLETED_TXN_COMPONENTS (
+ CTC_TXNID bigint,
+ CTC_DATABASE varchar(128) NOT NULL,
+ CTC_TABLE varchar(128),
+ CTC_PARTITION varchar(767)
+);
+
+CREATE TABLE NEXT_TXN_ID (
+ NTXN_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_TXN_ID VALUES(1);
+
+CREATE TABLE HIVE_LOCKS (
+ HL_LOCK_EXT_ID bigint NOT NULL,
+ HL_LOCK_INT_ID bigint NOT NULL,
+ HL_TXNID bigint,
+ HL_DB varchar(128) NOT NULL,
+ HL_TABLE varchar(128),
+ HL_PARTITION varchar(767),
+ HL_LOCK_STATE char(1) NOT NULL,
+ HL_LOCK_TYPE char(1) NOT NULL,
+ HL_LAST_HEARTBEAT bigint NOT NULL,
+ HL_ACQUIRED_AT bigint,
+ HL_USER varchar(128) NOT NULL,
+ HL_HOST varchar(128) NOT NULL,
+ HL_HEARTBEAT_COUNT integer,
+ HL_AGENT_INFO varchar(128),
+ HL_BLOCKEDBY_EXT_ID bigint,
+ HL_BLOCKEDBY_INT_ID bigint,
+ PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID)
+);
+
+CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+ NL_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE COMPACTION_QUEUE (
+ CQ_ID bigint PRIMARY KEY,
+ CQ_DATABASE varchar(128) NOT NULL,
+ CQ_TABLE varchar(128) NOT NULL,
+ CQ_PARTITION varchar(767),
+ CQ_STATE char(1) NOT NULL,
+ CQ_TYPE char(1) NOT NULL,
+ CQ_WORKER_ID varchar(128),
+ CQ_START bigint,
+ CQ_RUN_AS varchar(128),
+ CQ_HIGHEST_TXN_ID bigint,
+ CQ_META_INFO varchar(2048) for bit data,
+ CQ_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
+ NCQ_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);
+
+CREATE TABLE COMPLETED_COMPACTIONS (
+ CC_ID bigint PRIMARY KEY,
+ CC_DATABASE varchar(128) NOT NULL,
+ CC_TABLE varchar(128) NOT NULL,
+ CC_PARTITION varchar(767),
+ CC_STATE char(1) NOT NULL,
+ CC_TYPE char(1) NOT NULL,
+ CC_WORKER_ID varchar(128),
+ CC_START bigint,
+ CC_END bigint,
+ CC_RUN_AS varchar(128),
+ CC_HIGHEST_TXN_ID bigint,
+ CC_META_INFO varchar(2048) for bit data,
+ CC_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE AUX_TABLE (
+ MT_KEY1 varchar(128) NOT NULL,
+ MT_KEY2 bigint NOT NULL,
+ MT_COMMENT varchar(255),
+ PRIMARY KEY(MT_KEY1, MT_KEY2)
+);
+
+--1st 4 cols make up a PK but since WS_PARTITION is nullable we can't declare such PK
+--This is a good candidate for Index orgainzed table
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
index 74ecac2..6b90b73 100644
--- a/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
@@ -10,5 +10,6 @@ RUN '029-HIVE-12822.derby.sql';
RUN '030-HIVE-12823.derby.sql';
RUN '031-HIVE-12831.derby.sql';
RUN '032-HIVE-12832.derby.sql';
+RUN '035-HIVE-13395.derby.sql';
UPDATE "APP".VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql
index dde8c45..94c686b 100644
--- a/metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql
@@ -1,5 +1,6 @@
-- Upgrade MetaStore schema from 2.0.0 to 2.1.0
RUN '033-HIVE-12892.derby.sql';
RUN '034-HIVE-13076.derby.sql';
+RUN '035-HIVE-13395.derby.sql';
UPDATE "APP".VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1;
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql b/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql
new file mode 100644
index 0000000..281014c
--- /dev/null
+++ b/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql
@@ -0,0 +1,9 @@
+CREATE TABLE WRITE_SET (
+ WS_DATABASE nvarchar(128) NOT NULL,
+ WS_TABLE nvarchar(128) NOT NULL,
+ WS_PARTITION nvarchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1) NULL;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
index 57d2343..a184f24 100644
--- a/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
@@ -964,7 +964,8 @@ CREATE TABLE TXN_COMPONENTS(
TC_TXNID bigint NULL,
TC_DATABASE nvarchar(128) NOT NULL,
TC_TABLE nvarchar(128) NULL,
- TC_PARTITION nvarchar(767) NULL
+ TC_PARTITION nvarchar(767) NULL,
+ TC_OPERATION_TYPE char(1) NOT NULL
);
ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID);
@@ -980,6 +981,15 @@ CREATE TABLE AUX_TABLE (
)
);
+CREATE TABLE WRITE_SET (
+ WS_DATABASE nvarchar(128) NOT NULL,
+ WS_TABLE nvarchar(128) NOT NULL,
+ WS_PARTITION nvarchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql
index 2d9cf76..d9194ff 100644
--- a/metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql
@@ -977,7 +977,8 @@ CREATE TABLE TXN_COMPONENTS(
TC_TXNID bigint NULL,
TC_DATABASE nvarchar(128) NOT NULL,
TC_TABLE nvarchar(128) NULL,
- TC_PARTITION nvarchar(767) NULL
+ TC_PARTITION nvarchar(767) NULL,
+ TC_OPERATION_TYPE char(1) NOT NULL
);
ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID);
@@ -1011,6 +1012,15 @@ ALTER TABLE KEY_CONSTRAINTS ADD CONSTRAINT CONSTRAINTS_PK PRIMARY KEY (CONSTRAIN
CREATE INDEX CONSTRAINTS_PARENT_TBL_ID__INDEX ON KEY_CONSTRAINTS(PARENT_TBL_ID);
+CREATE TABLE WRITE_SET (
+ WS_DATABASE nvarchar(128) NOT NULL,
+ WS_TABLE nvarchar(128) NOT NULL,
+ WS_PARTITION nvarchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql b/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
index b0f28bb..251e621 100644
--- a/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
@@ -11,6 +11,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0' AS MESSAGE;
:r 015-HIVE-12823.mssql.sql;
:r 016-HIVE-12831.mssql.sql;
:r 017-HIVE-12832.mssql.sql;
+:r 020-HIVE-13395.mssql.sql;
UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS MESSAGE;
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql b/metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql
index 3e5cb30..c796126 100644
--- a/metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql
@@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0' AS MESSAGE;
:r 018-HIVE-12892.mssql.sql;
:r 019-HIVE-13076.mssql.sql;
+:r 020-HIVE-13395.mssql.sql;
UPDATE VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0' AS MESSAGE;
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql b/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql
new file mode 100644
index 0000000..586caef
--- /dev/null
+++ b/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql
@@ -0,0 +1,10 @@
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql
index 466e950..a6b783c 100644
--- a/metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql
@@ -839,7 +839,7 @@ CREATE INDEX `CONSTRAINTS_PARENT_TABLE_ID_INDEX` ON KEY_CONSTRAINTS (`PARENT_TBL
-- ----------------------------
-- Transaction and Lock Tables
-- ----------------------------
-SOURCE hive-txn-schema-2.0.0.mysql.sql;
+SOURCE hive-txn-schema-2.1.0.mysql.sql;
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mysql/hive-txn-schema-2.1.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-txn-schema-2.1.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-txn-schema-2.1.0.mysql.sql
new file mode 100644
index 0000000..369d6bb
--- /dev/null
+++ b/metastore/scripts/upgrade/mysql/hive-txn-schema-2.1.0.mysql.sql
@@ -0,0 +1,131 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+--
+-- Tables for transaction management
+--
+
+CREATE TABLE TXNS (
+ TXN_ID bigint PRIMARY KEY,
+ TXN_STATE char(1) NOT NULL,
+ TXN_STARTED bigint NOT NULL,
+ TXN_LAST_HEARTBEAT bigint NOT NULL,
+ TXN_USER varchar(128) NOT NULL,
+ TXN_HOST varchar(128) NOT NULL,
+ TXN_AGENT_INFO varchar(128),
+ TXN_META_INFO varchar(128),
+ TXN_HEARTBEAT_COUNT int
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE TXN_COMPONENTS (
+ TC_TXNID bigint NOT NULL,
+ TC_DATABASE varchar(128) NOT NULL,
+ TC_TABLE varchar(128) NOT NULL,
+ TC_PARTITION varchar(767),
+ TC_OPERATION_TYPE char(1) NOT NULL,
+ FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE COMPLETED_TXN_COMPONENTS (
+ CTC_TXNID bigint NOT NULL,
+ CTC_DATABASE varchar(128) NOT NULL,
+ CTC_TABLE varchar(128),
+ CTC_PARTITION varchar(767)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE NEXT_TXN_ID (
+ NTXN_NEXT bigint NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+INSERT INTO NEXT_TXN_ID VALUES(1);
+
+CREATE TABLE HIVE_LOCKS (
+ HL_LOCK_EXT_ID bigint NOT NULL,
+ HL_LOCK_INT_ID bigint NOT NULL,
+ HL_TXNID bigint,
+ HL_DB varchar(128) NOT NULL,
+ HL_TABLE varchar(128),
+ HL_PARTITION varchar(767),
+ HL_LOCK_STATE char(1) not null,
+ HL_LOCK_TYPE char(1) not null,
+ HL_LAST_HEARTBEAT bigint NOT NULL,
+ HL_ACQUIRED_AT bigint,
+ HL_USER varchar(128) NOT NULL,
+ HL_HOST varchar(128) NOT NULL,
+ HL_HEARTBEAT_COUNT int,
+ HL_AGENT_INFO varchar(128),
+ HL_BLOCKEDBY_EXT_ID bigint,
+ HL_BLOCKEDBY_INT_ID bigint,
+ PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID),
+ KEY HIVE_LOCK_TXNID_INDEX (HL_TXNID)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE INDEX HL_TXNID_IDX ON HIVE_LOCKS (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+ NL_NEXT bigint NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE COMPACTION_QUEUE (
+ CQ_ID bigint PRIMARY KEY,
+ CQ_DATABASE varchar(128) NOT NULL,
+ CQ_TABLE varchar(128) NOT NULL,
+ CQ_PARTITION varchar(767),
+ CQ_STATE char(1) NOT NULL,
+ CQ_TYPE char(1) NOT NULL,
+ CQ_WORKER_ID varchar(128),
+ CQ_START bigint,
+ CQ_RUN_AS varchar(128),
+ CQ_HIGHEST_TXN_ID bigint,
+ CQ_META_INFO varbinary(2048),
+ CQ_HADOOP_JOB_ID varchar(32)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE COMPLETED_COMPACTIONS (
+ CC_ID bigint PRIMARY KEY,
+ CC_DATABASE varchar(128) NOT NULL,
+ CC_TABLE varchar(128) NOT NULL,
+ CC_PARTITION varchar(767),
+ CC_STATE char(1) NOT NULL,
+ CC_TYPE char(1) NOT NULL,
+ CC_WORKER_ID varchar(128),
+ CC_START bigint,
+ CC_END bigint,
+ CC_RUN_AS varchar(128),
+ CC_HIGHEST_TXN_ID bigint,
+ CC_META_INFO varbinary(2048),
+ CC_HADOOP_JOB_ID varchar(32)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
+ NCQ_NEXT bigint NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);
+
+CREATE TABLE AUX_TABLE (
+ MT_KEY1 varchar(128) NOT NULL,
+ MT_KEY2 bigint NOT NULL,
+ MT_COMMENT varchar(255),
+ PRIMARY KEY(MT_KEY1, MT_KEY2)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql b/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
index 477c10b..b65aee5 100644
--- a/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
@@ -11,6 +11,7 @@ SOURCE 029-HIVE-12822.mysql.sql;
SOURCE 030-HIVE-12823.mysql.sql;
SOURCE 031-HIVE-12831.mysql.sql;
SOURCE 032-HIVE-12832.mysql.sql;
+SOURCE 035-HIVE-13395.mysql.sql;
UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS ' ';
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql b/metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql
index eb21f73..c3f83b3 100644
--- a/metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql
@@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0' AS ' ';
SOURCE 033-HIVE-12892.mysql.sql;
SOURCE 034-HIVE-13076.mysql.sql;
+SOURCE 035-HIVE-12295.mysql.sql;
UPDATE VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0' AS ' ';
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql b/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql
new file mode 100644
index 0000000..ad1bbd9
--- /dev/null
+++ b/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql
@@ -0,0 +1,10 @@
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar2(128) NOT NULL,
+ WS_TABLE varchar2(128) NOT NULL,
+ WS_PARTITION varchar2(767),
+ WS_TXNID number(19) NOT NULL,
+ WS_COMMIT_ID number(19) NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql
index f57e588..d003a16 100644
--- a/metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql
@@ -808,7 +808,7 @@ CREATE INDEX CONSTRAINTS_PARENT_TBL_ID_INDEX ON KEY_CONSTRAINTS(PARENT_TBL_ID);
------------------------------
-- Transaction and lock tables
------------------------------
-@hive-txn-schema-2.0.0.oracle.sql;
+@hive-txn-schema-2.1.0.oracle.sql;
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
index 788741a..199ff4c 100644
--- a/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
@@ -33,7 +33,8 @@ CREATE TABLE TXN_COMPONENTS (
TC_TXNID NUMBER(19) REFERENCES TXNS (TXN_ID),
TC_DATABASE VARCHAR2(128) NOT NULL,
TC_TABLE VARCHAR2(128),
- TC_PARTITION VARCHAR2(767) NULL
+ TC_PARTITION VARCHAR2(767) NULL,
+ TC_OPERATION_TYPE char(1) NOT NULL
) ROWDEPENDENCIES;
CREATE TABLE COMPLETED_TXN_COMPONENTS (
@@ -118,3 +119,12 @@ CREATE TABLE AUX_TABLE (
PRIMARY KEY(MT_KEY1, MT_KEY2)
);
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar2(128) NOT NULL,
+ WS_TABLE varchar2(128) NOT NULL,
+ WS_PARTITION varchar2(767),
+ WS_TXNID number(19) NOT NULL,
+ WS_COMMIT_ID number(19) NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/hive-txn-schema-2.1.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-txn-schema-2.1.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-txn-schema-2.1.0.oracle.sql
new file mode 100644
index 0000000..d39baab
--- /dev/null
+++ b/metastore/scripts/upgrade/oracle/hive-txn-schema-2.1.0.oracle.sql
@@ -0,0 +1,129 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the License); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an AS IS BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+--
+-- Tables for transaction management
+--
+
+CREATE TABLE TXNS (
+ TXN_ID NUMBER(19) PRIMARY KEY,
+ TXN_STATE char(1) NOT NULL,
+ TXN_STARTED NUMBER(19) NOT NULL,
+ TXN_LAST_HEARTBEAT NUMBER(19) NOT NULL,
+ TXN_USER varchar(128) NOT NULL,
+ TXN_HOST varchar(128) NOT NULL,
+ TXN_AGENT_INFO varchar2(128),
+ TXN_META_INFO varchar2(128),
+ TXN_HEARTBEAT_COUNT number(10)
+) ROWDEPENDENCIES;
+
+CREATE TABLE TXN_COMPONENTS (
+ TC_TXNID NUMBER(19) REFERENCES TXNS (TXN_ID),
+ TC_DATABASE VARCHAR2(128) NOT NULL,
+ TC_TABLE VARCHAR2(128),
+ TC_PARTITION VARCHAR2(767) NULL,
+ TC_OPERATION_TYPE char(1) NOT NULL
+) ROWDEPENDENCIES;
+
+CREATE TABLE COMPLETED_TXN_COMPONENTS (
+ CTC_TXNID NUMBER(19),
+ CTC_DATABASE varchar(128) NOT NULL,
+ CTC_TABLE varchar(128),
+ CTC_PARTITION varchar(767)
+) ROWDEPENDENCIES;
+
+CREATE TABLE NEXT_TXN_ID (
+ NTXN_NEXT NUMBER(19) NOT NULL
+);
+INSERT INTO NEXT_TXN_ID VALUES(1);
+
+CREATE TABLE HIVE_LOCKS (
+ HL_LOCK_EXT_ID NUMBER(19) NOT NULL,
+ HL_LOCK_INT_ID NUMBER(19) NOT NULL,
+ HL_TXNID NUMBER(19),
+ HL_DB VARCHAR2(128) NOT NULL,
+ HL_TABLE VARCHAR2(128),
+ HL_PARTITION VARCHAR2(767),
+ HL_LOCK_STATE CHAR(1) NOT NULL,
+ HL_LOCK_TYPE CHAR(1) NOT NULL,
+ HL_LAST_HEARTBEAT NUMBER(19) NOT NULL,
+ HL_ACQUIRED_AT NUMBER(19),
+ HL_USER varchar(128) NOT NULL,
+ HL_HOST varchar(128) NOT NULL,
+ HL_HEARTBEAT_COUNT number(10),
+ HL_AGENT_INFO varchar2(128),
+ HL_BLOCKEDBY_EXT_ID number(19),
+ HL_BLOCKEDBY_INT_ID number(19),
+ PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID)
+) ROWDEPENDENCIES;
+
+CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+ NL_NEXT NUMBER(19) NOT NULL
+);
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE COMPACTION_QUEUE (
+ CQ_ID NUMBER(19) PRIMARY KEY,
+ CQ_DATABASE varchar(128) NOT NULL,
+ CQ_TABLE varchar(128) NOT NULL,
+ CQ_PARTITION varchar(767),
+ CQ_STATE char(1) NOT NULL,
+ CQ_TYPE char(1) NOT NULL,
+ CQ_WORKER_ID varchar(128),
+ CQ_START NUMBER(19),
+ CQ_RUN_AS varchar(128),
+ CQ_HIGHEST_TXN_ID NUMBER(19),
+ CQ_META_INFO BLOB,
+ CQ_HADOOP_JOB_ID varchar2(32)
+) ROWDEPENDENCIES;
+
+CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
+ NCQ_NEXT NUMBER(19) NOT NULL
+);
+INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);
+
+CREATE TABLE COMPLETED_COMPACTIONS (
+ CC_ID NUMBER(19) PRIMARY KEY,
+ CC_DATABASE varchar(128) NOT NULL,
+ CC_TABLE varchar(128) NOT NULL,
+ CC_PARTITION varchar(767),
+ CC_STATE char(1) NOT NULL,
+ CC_TYPE char(1) NOT NULL,
+ CC_WORKER_ID varchar(128),
+ CC_START NUMBER(19),
+ CC_END NUMBER(19),
+ CC_RUN_AS varchar(128),
+ CC_HIGHEST_TXN_ID NUMBER(19),
+ CC_META_INFO BLOB,
+ CC_HADOOP_JOB_ID varchar2(32)
+) ROWDEPENDENCIES;
+
+CREATE TABLE AUX_TABLE (
+ MT_KEY1 varchar2(128) NOT NULL,
+ MT_KEY2 number(19) NOT NULL,
+ MT_COMMENT varchar2(255),
+ PRIMARY KEY(MT_KEY1, MT_KEY2)
+);
+
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar2(128) NOT NULL,
+ WS_TABLE varchar2(128) NOT NULL,
+ WS_PARTITION varchar2(767),
+ WS_TXNID number(19) NOT NULL,
+ WS_COMMIT_ID number(19) NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql b/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
index 94ee2c4..5939b34 100644
--- a/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
@@ -11,6 +11,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0' AS Status from dual;
@030-HIVE-12823.oracle.sql;
@031-HIVE-12381.oracle.sql;
@032-HIVE-12832.oracle.sql;
+@035-HIVE-13395.oracle.sql;
UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS Status from dual;
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql b/metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql
index 8c065a1..a226d9a 100644
--- a/metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql
@@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0' AS Status from dual;
@033-HIVE-12892.oracle.sql;
@034-HIVE-13076.oracle.sql;
+@035-HIVE-13395.oracle.sql;
UPDATE VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0' AS Status from dual;
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql b/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql
new file mode 100644
index 0000000..4dda283
--- /dev/null
+++ b/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql
@@ -0,0 +1,10 @@
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
+
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql
index e209489..43e984c 100644
--- a/metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql
@@ -1480,7 +1480,7 @@ GRANT ALL ON SCHEMA public TO PUBLIC;
------------------------------
-- Transaction and lock tables
------------------------------
-\i hive-txn-schema-2.0.0.postgres.sql;
+\i hive-txn-schema-2.1.0.postgres.sql;
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
index b2fc1a8..b606f81 100644
--- a/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
@@ -33,7 +33,8 @@ CREATE TABLE TXN_COMPONENTS (
TC_TXNID bigint REFERENCES TXNS (TXN_ID),
TC_DATABASE varchar(128) NOT NULL,
TC_TABLE varchar(128),
- TC_PARTITION varchar(767) DEFAULT NULL
+ TC_PARTITION varchar(767) DEFAULT NULL,
+ TC_OPERATION_TYPE char(1) NOT NULL
);
CREATE TABLE COMPLETED_TXN_COMPONENTS (
@@ -118,4 +119,12 @@ CREATE TABLE AUX_TABLE (
PRIMARY KEY(MT_KEY1, MT_KEY2)
);
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/hive-txn-schema-2.1.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-txn-schema-2.1.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-txn-schema-2.1.0.postgres.sql
new file mode 100644
index 0000000..262b93e
--- /dev/null
+++ b/metastore/scripts/upgrade/postgres/hive-txn-schema-2.1.0.postgres.sql
@@ -0,0 +1,129 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+--
+-- Tables for transaction management
+--
+
+CREATE TABLE TXNS (
+ TXN_ID bigint PRIMARY KEY,
+ TXN_STATE char(1) NOT NULL,
+ TXN_STARTED bigint NOT NULL,
+ TXN_LAST_HEARTBEAT bigint NOT NULL,
+ TXN_USER varchar(128) NOT NULL,
+ TXN_HOST varchar(128) NOT NULL,
+ TXN_AGENT_INFO varchar(128),
+ TXN_META_INFO varchar(128),
+ TXN_HEARTBEAT_COUNT integer
+);
+
+CREATE TABLE TXN_COMPONENTS (
+ TC_TXNID bigint REFERENCES TXNS (TXN_ID),
+ TC_DATABASE varchar(128) NOT NULL,
+ TC_TABLE varchar(128),
+ TC_PARTITION varchar(767) DEFAULT NULL,
+ TC_OPERATION_TYPE char(1) NOT NULL
+);
+
+CREATE TABLE COMPLETED_TXN_COMPONENTS (
+ CTC_TXNID bigint,
+ CTC_DATABASE varchar(128) NOT NULL,
+ CTC_TABLE varchar(128),
+ CTC_PARTITION varchar(767)
+);
+
+CREATE TABLE NEXT_TXN_ID (
+ NTXN_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_TXN_ID VALUES(1);
+
+CREATE TABLE HIVE_LOCKS (
+ HL_LOCK_EXT_ID bigint NOT NULL,
+ HL_LOCK_INT_ID bigint NOT NULL,
+ HL_TXNID bigint,
+ HL_DB varchar(128) NOT NULL,
+ HL_TABLE varchar(128),
+ HL_PARTITION varchar(767) DEFAULT NULL,
+ HL_LOCK_STATE char(1) NOT NULL,
+ HL_LOCK_TYPE char(1) NOT NULL,
+ HL_LAST_HEARTBEAT bigint NOT NULL,
+ HL_ACQUIRED_AT bigint,
+ HL_USER varchar(128) NOT NULL,
+ HL_HOST varchar(128) NOT NULL,
+ HL_HEARTBEAT_COUNT integer,
+ HL_AGENT_INFO varchar(128),
+ HL_BLOCKEDBY_EXT_ID bigint,
+ HL_BLOCKEDBY_INT_ID bigint,
+ PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID)
+);
+
+CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS USING hash (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+ NL_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE COMPACTION_QUEUE (
+ CQ_ID bigint PRIMARY KEY,
+ CQ_DATABASE varchar(128) NOT NULL,
+ CQ_TABLE varchar(128) NOT NULL,
+ CQ_PARTITION varchar(767),
+ CQ_STATE char(1) NOT NULL,
+ CQ_TYPE char(1) NOT NULL,
+ CQ_WORKER_ID varchar(128),
+ CQ_START bigint,
+ CQ_RUN_AS varchar(128),
+ CQ_HIGHEST_TXN_ID bigint,
+ CQ_META_INFO bytea,
+ CQ_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
+ NCQ_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);
+
+CREATE TABLE COMPLETED_COMPACTIONS (
+ CC_ID bigint PRIMARY KEY,
+ CC_DATABASE varchar(128) NOT NULL,
+ CC_TABLE varchar(128) NOT NULL,
+ CC_PARTITION varchar(767),
+ CC_STATE char(1) NOT NULL,
+ CC_TYPE char(1) NOT NULL,
+ CC_WORKER_ID varchar(128),
+ CC_START bigint,
+ CC_END bigint,
+ CC_RUN_AS varchar(128),
+ CC_HIGHEST_TXN_ID bigint,
+ CC_META_INFO bytea,
+ CC_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE AUX_TABLE (
+ MT_KEY1 varchar(128) NOT NULL,
+ MT_KEY2 bigint NOT NULL,
+ MT_COMMENT varchar(255),
+ PRIMARY KEY(MT_KEY1, MT_KEY2)
+);
+
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+);
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql b/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
index 6eb5620..b1bcac0 100644
--- a/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
@@ -11,6 +11,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0';
\i 029-HIVE-12823.postgres.sql;
\i 030-HIVE-12831.postgres.sql;
\i 031-HIVE-12832.postgres.sql;
+\i 034-HIVE-13395.postgres.sql;
UPDATE "VERSION" SET "SCHEMA_VERSION"='1.3.0', "VERSION_COMMENT"='Hive release version 1.3.0' where "VER_ID"=1;
SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0';
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql b/metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql
index e96a6ec..7fc603f 100644
--- a/metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql
@@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0';
\i 032-HIVE-12892.postgres.sql;
\i 033-HIVE-13076.postgres.sql;
+\i 034-HIVE-13395.postgres.sql;
UPDATE "VERSION" SET "SCHEMA_VERSION"='2.1.0', "VERSION_COMMENT"='Hive release version 2.1.0' where "VER_ID"=1;
SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0';
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 9a09e7a..044b960 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -6748,6 +6748,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService"));
startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService"));
+ startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidWriteSetService"));
}
private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception {
//todo: when metastore adds orderly-shutdown logic, houseKeeper.stop()
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index c82d23a..facce54 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -21,11 +21,13 @@ import java.sql.Connection;
import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLTransactionRollbackException;
import java.sql.Statement;
import java.util.Properties;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -82,7 +84,8 @@ public final class TxnDbUtil {
" TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
" TC_DATABASE varchar(128) NOT NULL," +
" TC_TABLE varchar(128)," +
- " TC_PARTITION varchar(767))");
+ " TC_PARTITION varchar(767)," +
+ " TC_OPERATION_TYPE char(1) NOT NULL)");
stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
" CTC_TXNID bigint," +
" CTC_DATABASE varchar(128) NOT NULL," +
@@ -146,18 +149,24 @@ public final class TxnDbUtil {
" CC_HADOOP_JOB_ID varchar(32))");
stmt.execute("CREATE TABLE AUX_TABLE (" +
- " MT_KEY1 varchar(128) NOT NULL," +
- " MT_KEY2 bigint NOT NULL," +
- " MT_COMMENT varchar(255)," +
- " PRIMARY KEY(MT_KEY1, MT_KEY2)" +
- ")");
-
- conn.commit();
+ " MT_KEY1 varchar(128) NOT NULL," +
+ " MT_KEY2 bigint NOT NULL," +
+ " MT_COMMENT varchar(255)," +
+ " PRIMARY KEY(MT_KEY1, MT_KEY2))");
+
+ stmt.execute("CREATE TABLE WRITE_SET (" +
+ " WS_DATABASE varchar(128) NOT NULL," +
+ " WS_TABLE varchar(128) NOT NULL," +
+ " WS_PARTITION varchar(767)," +
+ " WS_TXNID bigint NOT NULL," +
+ " WS_COMMIT_ID bigint NOT NULL," +
+ " WS_OPERATION_TYPE char(1) NOT NULL)"
+ );
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException re) {
- System.err.println("Error rolling back: " + re.getMessage());
+ LOG.error("Error rolling back: " + re.getMessage());
}
// This might be a deadlock, if so, let's retry
@@ -174,41 +183,60 @@ public final class TxnDbUtil {
}
public static void cleanDb() throws Exception {
- Connection conn = null;
- Statement stmt = null;
- try {
- conn = getConnection();
- stmt = conn.createStatement();
-
- // We want to try these, whether they succeed or fail.
+ int retryCount = 0;
+ while(++retryCount <= 3) {
+ boolean success = true;
+ Connection conn = null;
+ Statement stmt = null;
try {
- stmt.execute("DROP INDEX HL_TXNID_INDEX");
- } catch (Exception e) {
- System.err.println("Unable to drop index HL_TXNID_INDEX " + e.getMessage());
- }
+ conn = getConnection();
+ stmt = conn.createStatement();
- dropTable(stmt, "TXN_COMPONENTS");
- dropTable(stmt, "COMPLETED_TXN_COMPONENTS");
- dropTable(stmt, "TXNS");
- dropTable(stmt, "NEXT_TXN_ID");
- dropTable(stmt, "HIVE_LOCKS");
- dropTable(stmt, "NEXT_LOCK_ID");
- dropTable(stmt, "COMPACTION_QUEUE");
- dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
- dropTable(stmt, "COMPLETED_COMPACTIONS");
- dropTable(stmt, "AUX_TABLE");
- conn.commit();
- } finally {
- closeResources(conn, stmt, null);
+ // We want to try these, whether they succeed or fail.
+ try {
+ stmt.execute("DROP INDEX HL_TXNID_INDEX");
+ } catch (SQLException e) {
+ if(!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) {
+ //42X65/3000 means index doesn't exist
+ LOG.error("Unable to drop index HL_TXNID_INDEX " + e.getMessage() +
+ "State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount);
+ success = false;
+ }
+ }
+
+ success &= dropTable(stmt, "TXN_COMPONENTS", retryCount);
+ success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount);
+ success &= dropTable(stmt, "TXNS", retryCount);
+ success &= dropTable(stmt, "NEXT_TXN_ID", retryCount);
+ success &= dropTable(stmt, "HIVE_LOCKS", retryCount);
+ success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount);
+ success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount);
+ success &= dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID", retryCount);
+ success &= dropTable(stmt, "COMPLETED_COMPACTIONS", retryCount);
+ success &= dropTable(stmt, "AUX_TABLE", retryCount);
+ success &= dropTable(stmt, "WRITE_SET", retryCount);
+ } finally {
+ closeResources(conn, stmt, null);
+ }
+ if(success) {
+ return;
+ }
}
}
- private static void dropTable(Statement stmt, String name) {
+ private static boolean dropTable(Statement stmt, String name, int retryCount) throws SQLException {
try {
stmt.execute("DROP TABLE " + name);
- } catch (Exception e) {
- System.err.println("Unable to drop table " + name + ": " + e.getMessage());
+ return true;
+ } catch (SQLException e) {
+ if("42Y55".equals(e.getSQLState()) && 30000 == e.getErrorCode()) {
+ //failed because object doesn't exist
+ return true;
+ }
+ LOG.error("Unable to drop table " + name + ": " + e.getMessage() +
+ " State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount);
}
+ return false;
}
/**
@@ -259,6 +287,32 @@ public final class TxnDbUtil {
closeResources(conn, stmt, rs);
}
}
+ @VisibleForTesting
+ public static String queryToString(String query) throws Exception {
+ Connection conn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ StringBuilder sb = new StringBuilder();
+ try {
+ conn = getConnection();
+ stmt = conn.createStatement();
+ rs = stmt.executeQuery(query);
+ ResultSetMetaData rsmd = rs.getMetaData();
+ for(int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+ sb.append(rsmd.getColumnName(colPos)).append(" ");
+ }
+ sb.append('\n');
+ while(rs.next()) {
+ for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+ sb.append(rs.getObject(colPos)).append(" ");
+ }
+ sb.append('\n');
+ }
+ } finally {
+ closeResources(conn, stmt, rs);
+ }
+ return sb.toString();
+ }
static Connection getConnection() throws Exception {
HiveConf conf = new HiveConf();
@@ -272,7 +326,7 @@ public final class TxnDbUtil {
prop.setProperty("user", user);
prop.setProperty("password", passwd);
Connection conn = driver.connect(driverUrl, prop);
- conn.setAutoCommit(false);
+ conn.setAutoCommit(true);
return conn;
}
@@ -281,7 +335,7 @@ public final class TxnDbUtil {
try {
rs.close();
} catch (SQLException e) {
- System.err.println("Error closing ResultSet: " + e.getMessage());
+ LOG.error("Error closing ResultSet: " + e.getMessage());
}
}
[09/13] hive git commit: HIVE-13701: LLAP: Use different prefix for
llap task scheduler metrics (Prasanth Jayachandran reviewed by Sergey
Shelukhin)
Posted by jd...@apache.org.
HIVE-13701: LLAP: Use different prefix for llap task scheduler metrics (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0cc40456
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0cc40456
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0cc40456
Branch: refs/heads/llap
Commit: 0cc40456586aa5f3c54a34ceaf65eaef9a3d311b
Parents: 3517a99
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Thu May 5 21:43:48 2016 -0500
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu May 5 21:43:48 2016 -0500
----------------------------------------------------------------------
...doop-metrics2-llapdaemon.properties.template | 50 ++++++++++++++++++++
...trics2-llaptaskscheduler.properties.template | 50 ++++++++++++++++++++
.../hadoop-metrics2.properties.template | 50 --------------------
.../tezplugins/LlapTaskSchedulerService.java | 2 +-
.../metrics/LlapTaskSchedulerMetrics.java | 6 +--
5 files changed, 104 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0cc40456/llap-server/src/main/resources/hadoop-metrics2-llapdaemon.properties.template
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/hadoop-metrics2-llapdaemon.properties.template b/llap-server/src/main/resources/hadoop-metrics2-llapdaemon.properties.template
new file mode 100644
index 0000000..994acaa
--- /dev/null
+++ b/llap-server/src/main/resources/hadoop-metrics2-llapdaemon.properties.template
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# syntax: [prefix].[source|sink].[instance].[options]
+# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
+
+#*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
+# default sampling period, in seconds
+#*.sink.file.period=10
+
+# *.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
+# *.sink.timeline.period=60
+
+# llapdeamon metrics for all contexts (jvm,queue,executors,cache) will go to this file
+# llapdaemon.sink.file.filename=llapdaemon-metrics.out
+
+# to configure separate files per context define following for each context
+# llapdaemon.sink.file_jvm.class=org.apache.hadoop.metrics2.sink.FileSink
+# llapdaemon.sink.file_jvm.context=jvm
+# llapdaemon.sink.file_jvm.filename=llapdaemon-jvm-metrics.out
http://git-wip-us.apache.org/repos/asf/hive/blob/0cc40456/llap-server/src/main/resources/hadoop-metrics2-llaptaskscheduler.properties.template
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/hadoop-metrics2-llaptaskscheduler.properties.template b/llap-server/src/main/resources/hadoop-metrics2-llaptaskscheduler.properties.template
new file mode 100644
index 0000000..5cf71a7
--- /dev/null
+++ b/llap-server/src/main/resources/hadoop-metrics2-llaptaskscheduler.properties.template
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# syntax: [prefix].[source|sink].[instance].[options]
+# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
+
+#*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
+# default sampling period, in seconds
+#*.sink.file.period=10
+
+# *.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
+# *.sink.timeline.period=60
+
+# llapdeamon metrics for all contexts (jvm,queue,executors,cache) will go to this file
+# llaptaskscheduler.sink.file.filename=llaptaskscheduler-metrics.out
+
+# to configure separate files per context define following for each context
+# llaptaskscheduler.sink.file_jvm.class=org.apache.hadoop.metrics2.sink.FileSink
+# llaptaskscheduler.sink.file_jvm.context=jvm
+# llaptaskscheduler.sink.file_jvm.filename=llaptaskscheduler-jvm-metrics.out
http://git-wip-us.apache.org/repos/asf/hive/blob/0cc40456/llap-server/src/main/resources/hadoop-metrics2.properties.template
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/hadoop-metrics2.properties.template b/llap-server/src/main/resources/hadoop-metrics2.properties.template
deleted file mode 100644
index 994acaa..0000000
--- a/llap-server/src/main/resources/hadoop-metrics2.properties.template
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# syntax: [prefix].[source|sink].[instance].[options]
-# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
-
-#*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
-# default sampling period, in seconds
-#*.sink.file.period=10
-
-# *.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
-# *.sink.timeline.period=60
-
-# llapdeamon metrics for all contexts (jvm,queue,executors,cache) will go to this file
-# llapdaemon.sink.file.filename=llapdaemon-metrics.out
-
-# to configure separate files per context define following for each context
-# llapdaemon.sink.file_jvm.class=org.apache.hadoop.metrics2.sink.FileSink
-# llapdaemon.sink.file_jvm.context=jvm
-# llapdaemon.sink.file_jvm.filename=llapdaemon-jvm-metrics.out
http://git-wip-us.apache.org/repos/asf/hive/blob/0cc40456/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index da1e17f..733049d 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -267,7 +267,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (initMetrics) {
// Initialize the metrics system
- LlapMetricsSystem.initialize("LlapDaemon");
+ LlapMetricsSystem.initialize("LlapTaskScheduler");
this.pauseMonitor = new JvmPauseMonitor(conf);
pauseMonitor.start();
String displayName = "LlapTaskSchedulerMetrics-" + MetricsUtils.getHostName();
http://git-wip-us.apache.org/repos/asf/hive/blob/0cc40456/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
index b3230e2..04fd815 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
@@ -46,9 +46,9 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.source.JvmMetrics;
/**
- * Metrics about the llap daemon task scheduler.
+ * Metrics about the llap task scheduler.
*/
-@Metrics(about = "LlapDaemon Task Scheduler Metrics", context = "scheduler")
+@Metrics(about = "Llap Task Scheduler Metrics", context = "scheduler")
public class LlapTaskSchedulerMetrics implements MetricsSource {
private final String name;
@@ -99,7 +99,7 @@ public class LlapTaskSchedulerMetrics implements MetricsSource {
public void getMetrics(MetricsCollector collector, boolean b) {
MetricsRecordBuilder rb = collector.addRecord(SchedulerMetrics)
.setContext("scheduler")
- .tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME)
+ .tag(ProcessName, "DAGAppMaster")
.tag(SessionId, sessionId);
getTaskSchedulerStats(rb);
}
[07/13] hive git commit: HIVE-13395 (addednum) Lost Update problem in
ACID (Eugene Koifman, reviewed by Alan Gates)
Posted by jd...@apache.org.
HIVE-13395 (addednum) Lost Update problem in ACID (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eb2c54b3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eb2c54b3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eb2c54b3
Branch: refs/heads/llap
Commit: eb2c54b3f80d958c36c22dfb0ee962806e673830
Parents: 794f161
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu May 5 15:29:00 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu May 5 15:29:00 2016 -0700
----------------------------------------------------------------------
.../scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/eb2c54b3/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
index ea42757..d873012 100644
--- a/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
@@ -34,6 +34,7 @@ CREATE TABLE TXN_COMPONENTS (
TC_DATABASE varchar(128) NOT NULL,
TC_TABLE varchar(128),
TC_PARTITION varchar(767),
+ TC_OPERATION_TYPE char(1) NOT NULL,
FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
@@ -120,3 +121,12 @@ CREATE TABLE AUX_TABLE (
PRIMARY KEY(MT_KEY1, MT_KEY2)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+CREATE TABLE WRITE_SET (
+ WS_DATABASE varchar(128) NOT NULL,
+ WS_TABLE varchar(128) NOT NULL,
+ WS_PARTITION varchar(767),
+ WS_TXNID bigint NOT NULL,
+ WS_COMMIT_ID bigint NOT NULL,
+ WS_OPERATION_TYPE char(1) NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
[02/13] hive git commit: HIVE-13395 Lost Update problem in ACID
(Eugene Koifman, reviewed by Alan Gates)
Posted by jd...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index c0fa97a..06cd4aa 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -72,7 +72,7 @@ import java.util.regex.Pattern;
* used to properly sequence operations. Most notably:
* 1. various sequence IDs are generated with aid of this mutex
* 2. ensuring that each (Hive) Transaction state is transitioned atomically. Transaction state
- * includes it's actual state (Open, Aborted) as well as it's lock list/component list. Thus all
+ * includes its actual state (Open, Aborted) as well as it's lock list/component list. Thus all
* per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row.
* This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks.
* 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock
@@ -126,6 +126,41 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
static private DataSource connPool;
static private boolean doRetryOnConnPool = false;
+
+ private enum OpertaionType {
+ INSERT('i'), UPDATE('u'), DELETE('d');
+ private final char sqlConst;
+ OpertaionType(char sqlConst) {
+ this.sqlConst = sqlConst;
+ }
+ public String toString() {
+ return Character.toString(sqlConst);
+ }
+ public static OpertaionType fromString(char sqlConst) {
+ switch (sqlConst) {
+ case 'i':
+ return INSERT;
+ case 'u':
+ return UPDATE;
+ case 'd':
+ return DELETE;
+ default:
+ throw new IllegalArgumentException(quoteChar(sqlConst));
+ }
+ }
+ //we should instead just pass in OpertaionType from client (HIVE-13622)
+ @Deprecated
+ public static OpertaionType fromLockType(LockType lockType) {
+ switch (lockType) {
+ case SHARED_READ:
+ return INSERT;
+ case SHARED_WRITE:
+ return UPDATE;
+ default:
+ throw new IllegalArgumentException("Unexpected lock type: " + lockType);
+ }
+ }
+ }
/**
* Number of consecutive deadlocks we have seen
@@ -454,6 +489,31 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+ /**
+ * Concurrency/isolation notes:
+ * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)}
+ * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNX table for specific txnid:X
+ * see more notes below.
+ * In order to prevent lost updates, we need to determine if any 2 transactions overlap. Each txn
+ * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence
+ * so that we can compare commit time of txn T with start time of txn S. This sequence can be thought of
+ * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap.
+ *
+ * Motivating example:
+ * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1
+ * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot
+ * that they read appropriately. In particular, if txns do not overlap, then one follows the other
+ * (assumig they write the same entity), and thus the 2nd must see changes of the 1st. We ensure
+ * this by locking in snapshot after
+ * {@link #openTxns(OpenTxnRequest)} call is made (see {@link org.apache.hadoop.hive.ql.Driver#acquireLocksAndOpenTxn()})
+ * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure
+ * that txn T which will be considered a later txn, locks in a snapshot that includes the result
+ * of S's commit (assuming no other txns).
+ * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions
+ * were running in parallel). If T and S both locked in the same snapshot (for example commit of
+ * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed)
+ * 'x' would be updated to the same value by both, i.e. lost update.
+ */
public void commitTxn(CommitTxnRequest rqst)
throws NoSuchTxnException, TxnAbortedException, MetaException {
long txnid = rqst.getTxnid();
@@ -461,40 +521,116 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Connection dbConn = null;
Statement stmt = null;
ResultSet lockHandle = null;
+ ResultSet commitIdRs = null, rs;
try {
lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ /**
+ * This S4U will mutex with other commitTxn() and openTxns().
+ * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial
+ * Note: it's possible to have several txns have the same commit id. Suppose 3 txns start
+ * at the same time and no new txns start until all 3 commit.
+ * We could've incremented the sequence for commitId is well but it doesn't add anything functionally.
+ */
+ commitIdRs = stmt.executeQuery(addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID"));
+ if(!commitIdRs.next()) {
+ throw new IllegalStateException("No rows found in NEXT_TXN_ID");
+ }
+ long commitId = commitIdRs.getLong(1);
/**
* Runs at READ_COMMITTED with S4U on TXNS row for "txnid". S4U ensures that no other
* operation can change this txn (such acquiring locks). While lock() and commitTxn()
* should not normally run concurrently (for same txn) but could due to bugs in the client
* which could then corrupt internal transaction manager state. Also competes with abortTxn().
*/
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
-
lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
if(lockHandle == null) {
//this also ensures that txn is still there and in expected state (hasn't been timed out)
ensureValidTxn(dbConn, txnid, stmt);
shouldNeverHappen(txnid);
}
-
+ Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
+ int numCompsWritten = stmt.executeUpdate("insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" +
+ " select tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " +
+ "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")");
+ if(numCompsWritten == 0) {
+ /**
+ * current txn didn't update/delete anything (may have inserted), so just proceed with commit
+ *
+ * We only care about commit id for write txns, so for RO (when supported) txns we don't
+ * have to mutex on NEXT_TXN_ID.
+ * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's
+ * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn.
+ * If RO < W, then there is no reads-from relationship.
+ */
+ }
+ else {
+ /**
+ * see if there are any overlapping txns wrote the same element, i.e. have a conflict
+ * Since entire commit operation is mutexed wrt other start/commit ops,
+ * committed.ws_commit_id <= current.ws_commit_id for all txns
+ * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap
+ * For example, [17,20] is committed, [6,80] is being committed right now - these overlap
+ * [17,20] committed and [21,21] committing now - these do not overlap.
+ * [17,18] committed and [18,19] committing now - these overlap (here 18 started while 17 was still running)
+ */
+ rs = stmt.executeQuery
+ (addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," +
+ "committed.ws_table, committed.ws_partition, cur.ws_commit_id " +
+ "from WRITE_SET committed INNER JOIN WRITE_SET cur " +
+ "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " +
+ //For partitioned table we always track writes at partition level (never at table)
+ //and for non partitioned - always at table level, thus the same table should never
+ //have entries with partition key and w/o
+ "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " +
+ "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid
+ // with txnid, though any decent DB should infer this
+ " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as
+ // part of this commitTxn() op
+ " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns
+ //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all
+ " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) +
+ " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")"));
+ if(rs.next()) {
+ //found a conflict
+ String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";
+ StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4));
+ String partitionName = rs.getString(5);
+ if(partitionName != null) {
+ resource.append('/').append(partitionName);
+ }
+ String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource +
+ " committed by " + committedTxn;
+ close(rs);
+ //remove WRITE_SET info for current txn since it's about to abort
+ dbConn.rollback(undoWriteSetForCurrentTxn);
+ LOG.info(msg);
+ //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this
+ if(abortTxns(dbConn, Collections.singletonList(txnid)) != 1) {
+ throw new IllegalStateException(msg + " FAILED!");
+ }
+ dbConn.commit();
+ close(null, stmt, dbConn);
+ throw new TxnAbortedException(msg);
+ }
+ else {
+ //no conflicting operations, proceed with the rest of commit sequence
+ }
+ }
// Move the record from txn_components into completed_txn_components so that the compactor
// knows where to look to compact.
String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " +
"tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
LOG.debug("Going to execute insert <" + s + ">");
if (stmt.executeUpdate(s) < 1) {
- //this can be reasonable for an empty txn START/COMMIT
+ //this can be reasonable for an empty txn START/COMMIT or read-only txn
LOG.info("Expected to move at least one record from txn_components to " +
"completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
}
-
- // Always access TXN_COMPONENTS before HIVE_LOCKS;
s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
- // Always access HIVE_LOCKS before TXNS
s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
@@ -510,6 +646,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
+ close(commitIdRs);
close(lockHandle, stmt, dbConn);
unlockInternal();
}
@@ -517,7 +654,50 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
commitTxn(rqst);
}
}
-
+ @Override
+ public void performWriteSetGC() {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
+ if(!rs.next()) {
+ throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted");
+ }
+ long highestAllocatedTxnId = rs.getLong(1);
+ close(rs);
+ rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN));
+ if(!rs.next()) {
+ throw new IllegalStateException("Scalar query returned no rows?!?!!");
+ }
+ long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark
+ long lowestOpenTxnId = rs.getLong(1);
+ if(rs.wasNull()) {
+ //if here then there are no Open txns and highestAllocatedTxnId must be
+ //resolved (i.e. committed or aborted), either way
+ //there are no open txns with id <= highestAllocatedTxnId
+ //the +1 is there because "delete ..." below has < (which is correct for the case when
+ //there is an open txn
+ //Concurrency: even if new txn starts (or starts + commits) it is still true that
+ //there are no currently open txns that overlap with any committed txn with
+ //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough.
+ commitHighWaterMark = highestAllocatedTxnId + 1;
+ }
+ else {
+ commitHighWaterMark = lowestOpenTxnId;
+ }
+ int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark);
+ LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET");
+ dbConn.commit();
+ } catch (SQLException ex) {
+ LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
+ }
+ finally {
+ close(rs, stmt, dbConn);
+ }
+ }
/**
* As much as possible (i.e. in absence of retries) we want both operations to be done on the same
* connection (but separate transactions). This avoid some flakiness in BONECP where if you
@@ -545,7 +725,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
/**
* Note that by definition select for update is divorced from update, i.e. you executeQuery() to read
- * and then executeUpdate(). One other alternative would be to actually update the row in TXNX but
+ * and then executeUpdate(). One other alternative would be to actually update the row in TXNS but
* to the same value as before thus forcing db to acquire write lock for duration of the transaction.
*
* There is no real reason to return the ResultSet here other than to make sure the reference to it
@@ -616,6 +796,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
stmt.executeUpdate(s);
if (txnid > 0) {
+ /**DBTxnManager#acquireLocks() knows if it's I/U/D (that's how it decides what lock to get)
+ * So if we add that to LockRequest we'll know that here
+ * Should probably add it to LockComponent so that if in the future we decide wo allow 1 LockRequest
+ * to contain LockComponent for multiple operations.
+ * Deriving it from lock info doesn't distinguish between Update and Delete
+ *
+ * QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc
+ * FileSinkDesc.table is ql.metadata.Table
+ * Table.tableSpec which is TableSpec, which has specType which is SpecType
+ * So maybe this can work to know that this is part of dynamic partition insert in which case
+ * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here.
+ * In any case, that's an optimization for now; will be required when adding multi-stmt txns
+ */
// For each component in this lock request,
// add an entry to the txn_components table
// This must be done before HIVE_LOCKS is accessed
@@ -624,10 +817,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
String tblName = lc.getTablename();
String partName = lc.getPartitionname();
s = "insert into TXN_COMPONENTS " +
- "(tc_txnid, tc_database, tc_table, tc_partition) " +
+ "(tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) " +
"values (" + txnid + ", '" + dbName + "', " +
(tblName == null ? "null" : "'" + tblName + "'") + ", " +
- (partName == null ? "null" : "'" + partName + "'") + ")";
+ (partName == null ? "null" : "'" + partName + "'")+ "," +
+ quoteString(OpertaionType.fromLockType(lc.getType()).toString()) + ")";
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
}
@@ -698,9 +892,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
lockInternal();
if(dbConn.isClosed()) {
//should only get here if retrying this op
- dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
}
- dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
return checkLock(dbConn, extLockId);
} catch (SQLException e) {
LOG.debug("Going to rollback");
@@ -756,7 +949,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
//todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and
//checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired
//extra heartbeat is logically harmless, but ...
- dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
return checkLock(dbConn, extLockId);
} catch (SQLException e) {
LOG.debug("Going to rollback");
@@ -1162,11 +1354,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " "
+ JavaUtils.lockIdToString(extLockId) + " " + intLockId);
}
+
public void addDynamicPartitions(AddDynamicPartitions rqst)
throws NoSuchTxnException, TxnAbortedException, MetaException {
Connection dbConn = null;
Statement stmt = null;
ResultSet lockHandle = null;
+ ResultSet rs = null;
try {
try {
lockInternal();
@@ -1178,18 +1372,35 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
ensureValidTxn(dbConn, rqst.getTxnid(), stmt);
shouldNeverHappen(rqst.getTxnid());
}
+ //we should be able to get this from AddDynamicPartitions object longer term; in fact we'd have to
+ //for multi stmt txns if same table is written more than once per tx
+ // MoveTask knows if it's I/U/D
+ // MoveTask calls Hive.loadDynamicPartitions() which calls HiveMetaStoreClient.addDynamicPartitions()
+ // which ends up here so we'd need to add a field to AddDynamicPartitions.
+ String findOperationType = " tc_operation_type from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid()
+ + " and tc_database=" + quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
+ //do limit 1 on this; currently they will all have the same operations
+ rs = stmt.executeQuery(addLimitClause(1, findOperationType));
+ if(!rs.next()) {
+ throw new IllegalStateException("Unable to determine tc_operation_type for " + JavaUtils.txnIdToString(rqst.getTxnid()));
+ }
+ OpertaionType ot = OpertaionType.fromString(rs.getString(1).charAt(0));
+
+ //what if a txn writes the same table > 1 time... let's go with this for now, but really
+ //need to not write this in the first place, i.e. make this delete not needed
+ //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS
+ String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" +
+ quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
+ //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is
+ //much "wider" than necessary in a lot of cases. Here on the other hand, we know exactly which
+ //partitions have been written to. w/o this WRITE_SET would contain entries for partitions not actually
+ //written to
+ stmt.executeUpdate(deleteSql);
for (String partName : rqst.getPartitionnames()) {
- StringBuilder buff = new StringBuilder();
- buff.append("insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition) values (");
- buff.append(rqst.getTxnid());
- buff.append(", '");
- buff.append(rqst.getDbname());
- buff.append("', '");
- buff.append(rqst.getTablename());
- buff.append("', '");
- buff.append(partName);
- buff.append("')");
- String s = buff.toString();
+ String s =
+ "insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) values (" +
+ rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
+ "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + ")";
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
}
@@ -1908,60 +2119,113 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
return txnId != 0;
}
/**
+ * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller
+ * hl_lock_ext_id by only checking earlier locks.
+ *
+ * For any given SQL statment all locks required by it are grouped under single extLockId and are
+ * granted all at once or all locks wait.
+ *
+ * This is expected to run at READ_COMMITTED.
+ *
* Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take
* all locks for given extLockId or none. Would be more efficient to update state on all locks
- * at once. Semantics are the same since this is all part of the same txn@serializable.
+ * at once. Semantics are the same since this is all part of the same txn.
*
- * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller
- * hl_lock_ext_id by only checking earlier locks.
+ * If there is a concurrent commitTxn/rollbackTxn, those can only remove rows from HIVE_LOCKS.
+ * If they happen to be for the same txnid, there will be a WW conflict (in MS DB), if different txnid,
+ * checkLock() will in the worst case keep locks in Waiting state a little longer.
*/
- private LockResponse checkLock(Connection dbConn,
- long extLockId)
+ private LockResponse checkLock(Connection dbConn, long extLockId)
throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
- if(dbConn.getTransactionIsolation() != Connection.TRANSACTION_SERIALIZABLE) {
- //longer term we should instead use AUX_TABLE/S4U to serialize all checkLock() operations
- //that would be less prone to deadlocks
- throw new IllegalStateException("Unexpected Isolation Level: " + dbConn.getTransactionIsolation());
- }
- List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
+ TxnStore.MutexAPI.LockHandle handle = null;
+ Statement stmt = null;
+ ResultSet rs = null;
LockResponse response = new LockResponse();
- response.setLockid(extLockId);
-
- LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
- Savepoint save = dbConn.setSavepoint();//todo: get rid of this
- StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
- "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
- "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
-
- Set<String> strings = new HashSet<String>(locksBeingChecked.size());
- for (LockInfo info : locksBeingChecked) {
- strings.add(info.db);
- }
- boolean first = true;
- for (String s : strings) {
- if (first) first = false;
- else query.append(", ");
- query.append('\'');
- query.append(s);
- query.append('\'');
- }
- query.append(")");
-
- // If any of the table requests are null, then I need to pull all the
- // table locks for this db.
- boolean sawNull = false;
- strings.clear();
- for (LockInfo info : locksBeingChecked) {
- if (info.table == null) {
- sawNull = true;
- break;
- } else {
- strings.add(info.table);
+ /**
+ * todo: Longer term we should pass this from client somehow - this would be an optimization; once
+ * that is in place make sure to build and test "writeSet" below using OperationType not LockType
+ */
+ boolean isPartOfDynamicPartitionInsert = true;
+ try {
+ /**
+ * checkLock() must be mutexed against any other checkLock to make sure 2 conflicting locks
+ * are not granted by parallel checkLock() calls.
+ */
+ handle = getMutexAPI().acquireLock(MUTEX_KEY.CheckLock.name());
+ List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
+ response.setLockid(extLockId);
+
+ LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
+ Savepoint save = dbConn.setSavepoint();//todo: get rid of this
+ StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
+ "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
+ "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
+
+ Set<String> strings = new HashSet<String>(locksBeingChecked.size());
+
+ //This the set of entities that the statement represnted by extLockId wants to update
+ List<LockInfo> writeSet = new ArrayList<>();
+
+ for (LockInfo info : locksBeingChecked) {
+ strings.add(info.db);
+ if(!isPartOfDynamicPartitionInsert && info.type == LockType.SHARED_WRITE) {
+ writeSet.add(info);
+ }
}
- }
- if (!sawNull) {
- query.append(" and (hl_table is null or hl_table in(");
- first = true;
+ if(!writeSet.isEmpty()) {
+ if(writeSet.get(0).txnId == 0) {
+ //Write operation always start a txn
+ throw new IllegalStateException("Found Write lock for " + JavaUtils.lockIdToString(extLockId) + " but no txnid");
+ }
+ stmt = dbConn.createStatement();
+ StringBuilder sb = new StringBuilder(" ws_database, ws_table, ws_partition, " +
+ "ws_txnid, ws_commit_id " +
+ "from WRITE_SET where ws_commit_id >= " + writeSet.get(0).txnId + " and (");//see commitTxn() for more info on this inequality
+ for(LockInfo info : writeSet) {
+ sb.append("(ws_database = ").append(quoteString(info.db)).append(" and ws_table = ")
+ .append(quoteString(info.table)).append(" and ws_partition ")
+ .append(info.partition == null ? "is null" : "= " + quoteString(info.partition)).append(") or ");
+ }
+ sb.setLength(sb.length() - 4);//nuke trailing " or "
+ sb.append(")");
+ //1 row is sufficient to know we have to kill the query
+ rs = stmt.executeQuery(addLimitClause(1, sb.toString()));
+ if(rs.next()) {
+ /**
+ * if here, it means we found an already committed txn which overlaps with the current one and
+ * it updated the same resource the current txn wants to update. By First-committer-wins
+ * rule, current txn will not be allowed to commit so may as well kill it now; This is just an
+ * optimization to prevent wasting cluster resources to run a query which is known to be DOA.
+ * {@link #commitTxn(CommitTxnRequest)} has the primary responsibility to ensure this.
+ * checkLock() runs at READ_COMMITTED so you could have another (Hive) txn running commitTxn()
+ * in parallel and thus writing to WRITE_SET. commitTxn() logic is properly mutexed to ensure
+ * that we don't "miss" any WW conflicts. We could've mutexed the checkLock() and commitTxn()
+ * as well but this reduces concurrency for very little gain.
+ * Note that update/delete (which runs as dynamic partition insert) acquires a lock on the table,
+ * but WRITE_SET has entries for actual partitions updated. Thus this optimization will "miss"
+ * the WW conflict but it will be caught in commitTxn() where actual partitions written are known.
+ * This is OK since we want 2 concurrent updates that update different sets of partitions to both commit.
+ */
+ String resourceName = rs.getString(1) + '/' + rs.getString(2);
+ String partName = rs.getString(3);
+ if(partName != null) {
+ resourceName += '/' + partName;
+ }
+
+ String msg = "Aborting " + JavaUtils.txnIdToString(writeSet.get(0).txnId) +
+ " since a concurrent committed transaction [" + JavaUtils.txnIdToString(rs.getLong(4)) + "," + rs.getLong(5) +
+ "] has already updated resouce '" + resourceName + "'";
+ LOG.info(msg);
+ if(abortTxns(dbConn, Collections.singletonList(writeSet.get(0).txnId)) != 1) {
+ throw new IllegalStateException(msg + " FAILED!");
+ }
+ dbConn.commit();
+ throw new TxnAbortedException(msg);
+ }
+ close(rs, stmt, null);
+ }
+
+ boolean first = true;
for (String s : strings) {
if (first) first = false;
else query.append(", ");
@@ -1969,22 +2233,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
query.append(s);
query.append('\'');
}
- query.append("))");
+ query.append(")");
- // If any of the partition requests are null, then I need to pull all
- // partition locks for this table.
- sawNull = false;
+ // If any of the table requests are null, then I need to pull all the
+ // table locks for this db.
+ boolean sawNull = false;
strings.clear();
for (LockInfo info : locksBeingChecked) {
- if (info.partition == null) {
+ if (info.table == null) {
sawNull = true;
break;
} else {
- strings.add(info.partition);
+ strings.add(info.table);
}
}
if (!sawNull) {
- query.append(" and (hl_partition is null or hl_partition in(");
+ query.append(" and (hl_table is null or hl_table in(");
first = true;
for (String s : strings) {
if (first) first = false;
@@ -1994,14 +2258,35 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
query.append('\'');
}
query.append("))");
+
+ // If any of the partition requests are null, then I need to pull all
+ // partition locks for this table.
+ sawNull = false;
+ strings.clear();
+ for (LockInfo info : locksBeingChecked) {
+ if (info.partition == null) {
+ sawNull = true;
+ break;
+ } else {
+ strings.add(info.partition);
+ }
+ }
+ if (!sawNull) {
+ query.append(" and (hl_partition is null or hl_partition in(");
+ first = true;
+ for (String s : strings) {
+ if (first) first = false;
+ else query.append(", ");
+ query.append('\'');
+ query.append(s);
+ query.append('\'');
+ }
+ query.append("))");
+ }
}
- }
- query.append(" and hl_lock_ext_id <= ").append(extLockId);
+ query.append(" and hl_lock_ext_id <= ").append(extLockId);
- LOG.debug("Going to execute query <" + query.toString() + ">");
- Statement stmt = null;
- ResultSet rs = null;
- try {
+ LOG.debug("Going to execute query <" + query.toString() + ">");
stmt = dbConn.createStatement();
rs = stmt.executeQuery(query.toString());
SortedSet<LockInfo> lockSet = new TreeSet<LockInfo>(new LockInfoComparator());
@@ -2117,6 +2402,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
response.setState(LockState.ACQUIRED);
} finally {
close(rs, stmt, null);
+ if(handle != null) {
+ handle.releaseLocks();
+ }
}
return response;
}
@@ -2158,7 +2446,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
//if lock is part of txn, heartbeat info is in txn record
"hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) +
- ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
+ ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" + " where hl_lock_ext_id = " +
extLockId + " and hl_lock_int_id = " + lockInfo.intLockId;
LOG.debug("Going to execute update <" + s + ">");
int rc = stmt.executeUpdate(s);
@@ -2238,6 +2526,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
//todo: add LIMIT 1 instead of count - should be more efficient
s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid;
ResultSet rs2 = stmt.executeQuery(s);
+ //todo: strictly speaking you can commit an empty txn, thus 2nd conjunct is wrong but only
+ //possible for for multi-stmt txns
boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0;
LOG.debug("Going to rollback");
dbConn.rollback();
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 927e9bc..f9cac18 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -47,7 +47,7 @@ import java.util.Set;
@InterfaceStability.Evolving
public interface TxnStore {
- public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory}
+ public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, WriteSetCleaner}
// Compactor states (Should really be enum)
static final public String INITIATED_RESPONSE = "initiated";
static final public String WORKING_RESPONSE = "working";
@@ -321,6 +321,12 @@ public interface TxnStore {
public void purgeCompactionHistory() throws MetaException;
/**
+ * WriteSet tracking is used to ensure proper transaction isolation. This method deletes the
+ * transaction metadata once it becomes unnecessary.
+ */
+ public void performWriteSetGC();
+
+ /**
* Determine if there are enough consecutive failures compacting a table or partition that no
* new automatic compactions should be scheduled. User initiated compactions do not do this
* check.
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index cc9e583..b829d9d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -69,6 +69,8 @@ public class TxnUtils {
* @return a valid txn list.
*/
public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
+ //todo: this could be more efficient: using select min(txn_id) from TXNS where txn_state=" +
+ // quoteChar(TXN_OPEN) to compute compute HWM...
long highWater = txns.getTxn_high_water_mark();
long minOpenTxn = Long.MAX_VALUE;
long[] exceptions = new long[txns.getOpen_txnsSize()];
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 2c1560b..80e3cd6 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -413,7 +413,7 @@ public class TestCompactionTxnHandler {
lc.setTablename(tableName);
LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost");
lr.setTxnid(txnId);
- LockResponse lock = txnHandler.lock(new LockRequest(Arrays.asList(lc), "me", "localhost"));
+ LockResponse lock = txnHandler.lock(lr);
assertEquals(LockState.ACQUIRED, lock.getState());
txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnId, dbName, tableName,
@@ -429,8 +429,8 @@ public class TestCompactionTxnHandler {
assertEquals(dbName, ci.dbname);
assertEquals(tableName, ci.tableName);
switch (i++) {
- case 0: assertEquals("ds=today", ci.partName); break;
- case 1: assertEquals("ds=yesterday", ci.partName); break;
+ case 0: assertEquals("ds=today", ci.partName); break;
+ case 1: assertEquals("ds=yesterday", ci.partName); break;
default: throw new RuntimeException("What?");
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 28d0269..1a118a9 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -483,6 +483,7 @@ public class TestTxnHandler {
components.clear();
components.add(comp);
req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
res = txnHandler.lock(req);
assertTrue(res.getState() == LockState.ACQUIRED);
}
@@ -514,6 +515,7 @@ public class TestTxnHandler {
components.clear();
components.add(comp);
req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
res = txnHandler.lock(req);
assertTrue(res.getState() == LockState.WAITING);
}
@@ -580,6 +582,7 @@ public class TestTxnHandler {
List<LockComponent> components = new ArrayList<LockComponent>(1);
components.add(comp);
LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
LockResponse res = txnHandler.lock(req);
assertTrue(res.getState() == LockState.ACQUIRED);
@@ -602,6 +605,7 @@ public class TestTxnHandler {
List<LockComponent> components = new ArrayList<LockComponent>(1);
components.add(comp);
LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
LockResponse res = txnHandler.lock(req);
assertTrue(res.getState() == LockState.ACQUIRED);
@@ -611,6 +615,7 @@ public class TestTxnHandler {
components.clear();
components.add(comp);
req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
res = txnHandler.lock(req);
assertTrue(res.getState() == LockState.WAITING);
@@ -633,6 +638,7 @@ public class TestTxnHandler {
List<LockComponent> components = new ArrayList<LockComponent>(1);
components.add(comp);
LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
LockResponse res = txnHandler.lock(req);
assertTrue(res.getState() == LockState.ACQUIRED);
@@ -642,6 +648,7 @@ public class TestTxnHandler {
components.clear();
components.add(comp);
req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
res = txnHandler.lock(req);
assertTrue(res.getState() == LockState.WAITING);
@@ -651,6 +658,7 @@ public class TestTxnHandler {
components.clear();
components.add(comp);
req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
res = txnHandler.lock(req);
assertTrue(res.getState() == LockState.WAITING);
}
@@ -682,6 +690,7 @@ public class TestTxnHandler {
components.clear();
components.add(comp);
req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
res = txnHandler.lock(req);
assertTrue(res.getState() == LockState.WAITING);
}
@@ -725,6 +734,8 @@ public class TestTxnHandler {
List<LockComponent> components = new ArrayList<LockComponent>(1);
components.add(comp);
LockRequest req = new LockRequest(components, "me", "localhost");
+ long txnId = openTxn();
+ req.setTxnid(txnId);
LockResponse res = txnHandler.lock(req);
long lockid1 = res.getLockid();
assertTrue(res.getState() == LockState.ACQUIRED);
@@ -735,11 +746,12 @@ public class TestTxnHandler {
components.clear();
components.add(comp);
req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(openTxn());
res = txnHandler.lock(req);
long lockid2 = res.getLockid();
assertTrue(res.getState() == LockState.WAITING);
- txnHandler.unlock(new UnlockRequest(lockid1));
+ txnHandler.abortTxn(new AbortTxnRequest(txnId));
res = txnHandler.checkLock(new CheckLockRequest(lockid2));
assertTrue(res.getState() == LockState.ACQUIRED);
}
@@ -1070,16 +1082,14 @@ public class TestTxnHandler {
@Test
public void showLocks() throws Exception {
long begining = System.currentTimeMillis();
- long txnid = openTxn();
LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
List<LockComponent> components = new ArrayList<LockComponent>(1);
components.add(comp);
LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
LockResponse res = txnHandler.lock(req);
// Open txn
- txnid = openTxn();
+ long txnid = openTxn();
comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb");
comp.setTablename("mytable");
components = new ArrayList<LockComponent>(1);
@@ -1090,7 +1100,7 @@ public class TestTxnHandler {
// Locks not associated with a txn
components = new ArrayList<LockComponent>(1);
- comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "yourdb");
+ comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb");
comp.setTablename("yourtable");
comp.setPartitionname("yourpartition");
components.add(comp);
@@ -1104,14 +1114,13 @@ public class TestTxnHandler {
for (int i = 0; i < saw.length; i++) saw[i] = false;
for (ShowLocksResponseElement lock : locks) {
if (lock.getLockid() == 1) {
- assertEquals(1, lock.getTxnid());
+ assertEquals(0, lock.getTxnid());
assertEquals("mydb", lock.getDbname());
assertNull(lock.getTablename());
assertNull(lock.getPartname());
assertEquals(LockState.ACQUIRED, lock.getState());
assertEquals(LockType.EXCLUSIVE, lock.getType());
- assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
- lock.getTxnid() != 0);
+ assertTrue(lock.toString(), 0 != lock.getLastheartbeat());
assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining
+ " and " + System.currentTimeMillis(),
begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat());
@@ -1119,7 +1128,7 @@ public class TestTxnHandler {
assertEquals("localhost", lock.getHostname());
saw[0] = true;
} else if (lock.getLockid() == 2) {
- assertEquals(2, lock.getTxnid());
+ assertEquals(1, lock.getTxnid());
assertEquals("mydb", lock.getDbname());
assertEquals("mytable", lock.getTablename());
assertNull(lock.getPartname());
@@ -1137,7 +1146,7 @@ public class TestTxnHandler {
assertEquals("yourtable", lock.getTablename());
assertEquals("yourpartition", lock.getPartname());
assertEquals(LockState.ACQUIRED, lock.getState());
- assertEquals(LockType.SHARED_WRITE, lock.getType());
+ assertEquals(LockType.SHARED_READ, lock.getType());
assertTrue(lock.toString(), begining <= lock.getLastheartbeat() &&
System.currentTimeMillis() >= lock.getLastheartbeat());
assertTrue(begining <= lock.getAcquiredat() &&
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 1de3309..52dadb7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -377,7 +377,7 @@ public enum ErrorMsg {
"instantiated, check hive.txn.manager"),
TXN_NO_SUCH_TRANSACTION(10262, "No record of transaction {0} could be found, " +
"may have timed out", true),
- TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}.", true),
+ TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}. Reason: {1}", true),
DBTXNMGR_REQUIRES_CONCURRENCY(10264,
"To use DbTxnManager you must set hive.support.concurrency=true"),
TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true),
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 7fa57d6..18ed864 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -172,8 +172,9 @@ public class DbLockManager implements HiveLockManager{
LOG.error("Metastore could not find " + JavaUtils.txnIdToString(lock.getTxnid()));
throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(lock.getTxnid()));
} catch (TxnAbortedException e) {
- LOG.error("Transaction " + JavaUtils.txnIdToString(lock.getTxnid()) + " already aborted.");
- throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(lock.getTxnid()));
+ LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(lock.getTxnid()), e.getMessage());
+ LOG.error(le.getMessage());
+ throw le;
} catch (TException e) {
throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
e);
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 3aec8eb..9c2a346 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -107,6 +107,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
@Override
public long openTxn(String user) throws LockException {
+ //todo: why don't we lock the snapshot here??? Instead of having client make an explicit call
+ //whenever it chooses
init();
if(isTxnOpen()) {
throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
@@ -132,8 +134,17 @@ public class DbTxnManager extends HiveTxnManagerImpl {
@Override
public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
- acquireLocks(plan, ctx, username, true);
- startHeartbeat();
+ try {
+ acquireLocks(plan, ctx, username, true);
+ startHeartbeat();
+ }
+ catch(LockException e) {
+ if(e.getCause() instanceof TxnAbortedException) {
+ txnId = 0;
+ statementId = -1;
+ }
+ throw e;
+ }
}
/**
@@ -157,7 +168,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
// For each source to read, get a shared lock
for (ReadEntity input : plan.getInputs()) {
if (!input.needsLock() || input.isUpdateOrDelete()) {
- // We don't want to acquire readlocks during update or delete as we'll be acquiring write
+ // We don't want to acquire read locks during update or delete as we'll be acquiring write
// locks instead.
continue;
}
@@ -320,8 +331,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
} catch (TxnAbortedException e) {
- LOG.error("Transaction " + JavaUtils.txnIdToString(txnId) + " aborted");
- throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
+ LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId), e.getMessage());
+ LOG.error(le.getMessage());
+ throw le;
} catch (TException e) {
throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
e);
@@ -389,8 +401,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
LOG.error("Unable to find transaction " + JavaUtils.txnIdToString(txnId));
throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
} catch (TxnAbortedException e) {
- LOG.error("Transaction aborted " + JavaUtils.txnIdToString(txnId));
- throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
+ LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId), e.getMessage());
+ LOG.error(le.getMessage());
+ throw le;
} catch (TException e) {
throw new LockException(
ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg() + "(" + JavaUtils.txnIdToString(txnId)
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
new file mode 100644
index 0000000..9085a6a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
@@ -0,0 +1,61 @@
+package org.apache.hadoop.hive.ql.txn;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Periodically cleans WriteSet tracking information used in Transaction management
+ */
+public class AcidWriteSetService extends HouseKeeperServiceBase {
+ private static final Logger LOG = LoggerFactory.getLogger(AcidWriteSetService.class);
+ @Override
+ protected long getStartDelayMs() {
+ return 0;
+ }
+ @Override
+ protected long getIntervalMs() {
+ return hiveConf.getTimeVar(HiveConf.ConfVars.WRITE_SET_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+ @Override
+ protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+ return new WriteSetReaper(hiveConf, isAliveCounter);
+ }
+ @Override
+ public String getServiceDescription() {
+ return "Periodically cleans obsolete WriteSet tracking information used in Transaction management";
+ }
+ private static final class WriteSetReaper implements Runnable {
+ private final TxnStore txnHandler;
+ private final AtomicInteger isAliveCounter;
+ private WriteSetReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
+ this.isAliveCounter = isAliveCounter;
+ }
+ @Override
+ public void run() {
+ TxnStore.MutexAPI.LockHandle handle = null;
+ try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.WriteSetCleaner.name());
+ long startTime = System.currentTimeMillis();
+ txnHandler.performWriteSetGC();
+ int count = isAliveCounter.incrementAndGet();
+ LOG.info("cleaner ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds. isAliveCounter=" + count);
+ }
+ catch(Throwable t) {
+ LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+ }
+ finally {
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
index 947f17c..caab10d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
@@ -81,7 +81,7 @@ public abstract class HouseKeeperServiceBase implements HouseKeeperService {
*/
protected abstract long getStartDelayMs();
/**
- * Determines how fequently the service is running its task.
+ * Determines how frequently the service is running its task.
*/
protected abstract long getIntervalMs();
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index abbe5d4..949cbd5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -147,7 +147,7 @@ public class Initiator extends CompactorThread {
if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
} catch (Throwable t) {
LOG.error("Caught exception while trying to determine if we should compact " +
- ci + ". Marking clean to avoid repeated failures, " +
+ ci + ". Marking failed to avoid repeated failures, " +
"" + StringUtils.stringifyException(t));
txnHandler.markFailed(ci);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 6238e2b..767c10c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -182,7 +182,7 @@ public class Worker extends CompactorThread {
txnHandler.markCompacted(ci);
} catch (Exception e) {
LOG.error("Caught exception while trying to compact " + ci +
- ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e));
+ ". Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e));
txnHandler.markFailed(ci);
}
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 1030987..472da0b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -669,7 +669,7 @@ public class TestTxnCommands2 {
t.run();
}
- private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
+ public static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
int lastCount = houseKeeperService.getIsAliveCounter();
houseKeeperService.start(conf);
int maxIter = 10;
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index f87dd14..83a2ba3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -65,6 +65,26 @@ public class TestAcidUtils {
assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
AcidUtils.createFilename(p, options).toString());
}
+ @Test
+ public void testCreateFilenameLargeIds() throws Exception {
+ Path p = new Path("/tmp");
+ Configuration conf = new Configuration();
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+ .setOldStyle(true).bucket(123456789);
+ assertEquals("/tmp/123456789_0",
+ AcidUtils.createFilename(p, options).toString());
+ options.bucket(23)
+ .minimumTransactionId(1234567880)
+ .maximumTransactionId(1234567890)
+ .writingBase(true)
+ .setOldStyle(false);
+ assertEquals("/tmp/base_1234567890/bucket_00023",
+ AcidUtils.createFilename(p, options).toString());
+ options.writingBase(false);
+ assertEquals("/tmp/delta_1234567880_1234567890_0000/bucket_00023",
+ AcidUtils.createFilename(p, options).toString());
+ }
+
@Test
public void testParsing() throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 3a6e76e..22f7482 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.Context;
@@ -500,6 +501,12 @@ public class TestDbTxnManager {
partCols.add(fs);
t.setPartCols(partCols);
}
+ Map<String, String> tblProps = t.getParameters();
+ if(tblProps == null) {
+ tblProps = new HashMap<>();
+ }
+ tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+ t.setParameters(tblProps);
return t;
}
[05/13] hive git commit: HIVE-13619: Bucket map join plan is
incorrect (Vikram Dixit K, reviewed by Gunther Hagleitner)
Posted by jd...@apache.org.
HIVE-13619: Bucket map join plan is incorrect (Vikram Dixit K, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4eb96030
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4eb96030
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4eb96030
Branch: refs/heads/llap
Commit: 4eb960305f6cf30aa6e1011ee09388b1ab4c4fd9
Parents: da82819
Author: vikram <vi...@hortonworks.com>
Authored: Thu May 5 14:35:58 2016 -0700
Committer: vikram <vi...@hortonworks.com>
Committed: Thu May 5 14:35:58 2016 -0700
----------------------------------------------------------------------
ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4eb96030/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
index 41507b1..a8ed74c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
@@ -83,7 +83,7 @@ public class OperatorUtils {
public static <T> T findSingleOperatorUpstreamJoinAccounted(Operator<?> start, Class<T> clazz) {
Set<T> found = findOperatorsUpstreamJoinAccounted(start, clazz, new HashSet<T>());
- return found.size() == 1 ? found.iterator().next(): null;
+ return found.size() >= 1 ? found.iterator().next(): null;
}
public static <T> Set<T> findOperatorsUpstream(Collection<Operator<?>> starts, Class<T> clazz) {