You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:18:51 UTC

[01/13] hive git commit: HIVE-13395 Lost Update problem in ACID (Eugene Koifman, reviewed by Alan Gates)

Repository: hive
Updated Branches:
  refs/heads/llap 89ec219e1 -> f089f2e64


http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index e94af55..c956d78 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -17,7 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.lockmgr;
 
-import junit.framework.Assert;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.TestTxnCommands2;
+import org.apache.hadoop.hive.ql.txn.AcidWriteSetService;
+import org.junit.After;
+import org.junit.Assert;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
@@ -29,23 +35,32 @@ import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
  * See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager}
  * Tests here are "end-to-end"ish and simulate concurrent queries.
+ * 
+ * The general approach is to use an instance of Driver to use Driver.run() to create tables
+ * Use Driver.compile() to generate QueryPlan which can then be passed to HiveTxnManager.acquireLocks().
+ * Same HiveTxnManager is used to openTxn()/commitTxn() etc.  This can exercise almost the entire
+ * code path that CLI would but with the advantage that you can create a 2nd HiveTxnManager and then
+ * simulate interleaved transactional/locking operations but all from within a single thread.
+ * The later not only controls concurrency precisely but is the only way to run in UT env with DerbyDB.
  */
 public class TestDbTxnManager2 {
   private static HiveConf conf = new HiveConf(Driver.class);
   private HiveTxnManager txnMgr;
   private Context ctx;
   private Driver driver;
+  TxnStore txnHandler;
 
   @BeforeClass
   public static void setUpClass() throws Exception {
@@ -60,15 +75,17 @@ public class TestDbTxnManager2 {
     driver.init();
     TxnDbUtil.cleanDb();
     TxnDbUtil.prepDb();
-    txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    SessionState ss = SessionState.get();
+    ss.initTxnMgr(conf);
+    txnMgr = ss.getTxnMgr();
     Assert.assertTrue(txnMgr instanceof DbTxnManager);
+    txnHandler = TxnUtils.getTxnStore(conf);
+
   }
   @After
   public void tearDown() throws Exception {
     driver.close();
     if (txnMgr != null) txnMgr.closeTxnManager();
-    TxnDbUtil.cleanDb();
-    TxnDbUtil.prepDb();
   }
   @Test
   public void testLocksInSubquery() throws Exception {
@@ -192,22 +209,24 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     cpr = driver.compileAndRespond("update temp.T7 set a = 5 where b = 6");
     checkCmdOnDriver(cpr);
+    txnMgr.openTxn("Fifer");
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
-    List<HiveLock> updateLocks = ctx.getHiveLocks();
-    cpr = driver.compileAndRespond("drop database if exists temp");
-    LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7
+    checkCmdOnDriver(driver.compileAndRespond("drop database if exists temp"));
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    //txnMgr2.openTxn("Fiddler");
+    ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);//gets SS lock on T7
     List<ShowLocksResponseElement> locks = getLocks();
     Assert.assertEquals("Unexpected lock count", 2, locks.size());
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "temp", "T7", null, locks.get(0));
     checkLock(LockType.EXCLUSIVE, LockState.WAITING, "temp", null, null, locks.get(1));
-    txnMgr.getLockManager().releaseLocks(updateLocks);
-    lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
+    txnMgr.commitTxn();
+    ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(1).getLockid());
     locks = getLocks();
     Assert.assertEquals("Unexpected lock count", 1, locks.size());
     checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "temp", null, null, locks.get(0));
     List<HiveLock> xLock = new ArrayList<HiveLock>(0);
     xLock.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
-    txnMgr.getLockManager().releaseLocks(xLock);
+    txnMgr2.getLockManager().releaseLocks(xLock);
   }
   @Test
   public void updateSelectUpdate() throws Exception {
@@ -215,29 +234,27 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(cpr);
     cpr = driver.compileAndRespond("delete from T8 where b = 89");
     checkCmdOnDriver(cpr);
+    txnMgr.openTxn("Fifer");
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");//gets SS lock on T8
-    List<HiveLock> deleteLocks = ctx.getHiveLocks();
     cpr = driver.compileAndRespond("select a from T8");//gets S lock on T8
     checkCmdOnDriver(cpr);
-    txnMgr.acquireLocks(driver.getPlan(), ctx, "Fiddler");
-    cpr = driver.compileAndRespond("update T8 set a = 1 where b = 1");
-    checkCmdOnDriver(cpr);
-    LockState lockState = ((DbTxnManager) txnMgr).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    txnMgr2.openTxn("Fiddler");
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler");
+    checkCmdOnDriver(driver.compileAndRespond("update T8 set a = 1 where b = 1"));
+    ((DbTxnManager) txnMgr2).acquireLocks(driver.getPlan(), ctx, "Practical", false);//waits for SS lock on T8 from fifer
     List<ShowLocksResponseElement> locks = getLocks();
     Assert.assertEquals("Unexpected lock count", 3, locks.size());
     checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0));
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1));
     checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "T8", null, locks.get(2));
-    txnMgr.getLockManager().releaseLocks(deleteLocks);
-    lockState = ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());
+    txnMgr.rollbackTxn();
+    ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(2).getLockid());
     locks = getLocks();
     Assert.assertEquals("Unexpected lock count", 2, locks.size());
     checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T8", null, locks.get(0));
     checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "T8", null, locks.get(1));
-    List<HiveLock> relLocks = new ArrayList<HiveLock>(2);
-    relLocks.add(new DbLockManager.DbHiveLock(locks.get(0).getLockid()));
-    relLocks.add(new DbLockManager.DbHiveLock(locks.get(1).getLockid()));
-    txnMgr.getLockManager().releaseLocks(relLocks);
+    txnMgr2.commitTxn();
     cpr = driver.run("drop table if exists T6");
     locks = getLocks();
     Assert.assertEquals("Unexpected number of locks found", 0, locks.size());
@@ -617,12 +634,12 @@ public class TestDbTxnManager2 {
     txnMgr.getLockManager().releaseLocks(relLocks);
   }
 
-  private void checkLock(LockType type, LockState state, String db, String table, String partition, ShowLocksResponseElement l) {
-    Assert.assertEquals(l.toString(),l.getType(), type);
-    Assert.assertEquals(l.toString(),l.getState(), state);
-    Assert.assertEquals(l.toString(), normalizeCase(l.getDbname()), normalizeCase(db));
-    Assert.assertEquals(l.toString(), normalizeCase(l.getTablename()), normalizeCase(table));
-    Assert.assertEquals(l.toString(), normalizeCase(l.getPartname()), normalizeCase(partition));
+  private void checkLock(LockType expectedType, LockState expectedState, String expectedDb, String expectedTable, String expectedPartition, ShowLocksResponseElement actual) {
+    Assert.assertEquals(actual.toString(), expectedType, actual.getType());
+    Assert.assertEquals(actual.toString(), expectedState,actual.getState());
+    Assert.assertEquals(actual.toString(), normalizeCase(expectedDb), normalizeCase(actual.getDbname()));
+    Assert.assertEquals(actual.toString(), normalizeCase(expectedTable), normalizeCase(actual.getTablename()));
+    Assert.assertEquals(actual.toString(), normalizeCase(expectedPartition), normalizeCase(actual.getPartname()));
   }
   private void checkCmdOnDriver(CommandProcessorResponse cpr) {
     Assert.assertTrue(cpr.toString(), cpr.getResponseCode() == 0);
@@ -637,4 +654,541 @@ public class TestDbTxnManager2 {
     ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks();
     return rsp.getLocks();
   }
+
+  /**
+   * txns update same resource but do not overlap in time - no conflict
+   */
+  @Test
+  public void testWriteSetTracking1() throws Exception {
+    CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+      "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+
+    checkCmdOnDriver(driver.compileAndRespond("select * from TAB_PART"));
+    txnMgr.openTxn("Nicholas");
+    checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "Nicholas");
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    txnMgr.commitTxn();
+    txnMgr2.openTxn("Alexandra");
+    checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas");
+    txnMgr2.commitTxn();
+  }
+  /**
+   * txns overlap in time but do not update same resource - no conflict
+   */
+  @Test
+  public void testWriteSetTracking2() throws Exception {
+    CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+      "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("create table if not exists TAB2 (a int, b int) partitioned by (p string) " +
+      "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    txnMgr.openTxn("Peter");
+    checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "Peter");
+    txnMgr2.openTxn("Catherine");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    //note that "update" uses dynamic partitioning thus lock is on the table not partition
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+    txnMgr.commitTxn();
+    checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 9 where p = 'doh'"));
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "Catherine");
+    txnMgr2.commitTxn();
+  }
+
+  /**
+   * txns overlap and update the same resource - can't commit 2nd txn
+   */
+  @Test
+  public void testWriteSetTracking3() throws Exception {
+    CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+      "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+    txnMgr.openTxn("Known");
+    txnMgr2.openTxn("Unknown");
+    checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+    checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+    ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
+    locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks.get(1));
+    txnMgr.commitTxn();
+    LockException expectedException = null;
+    try {
+      txnMgr2.commitTxn();
+    }
+    catch (LockException e) {
+      expectedException = e;
+    }
+    Assert.assertTrue("Didn't get exception", expectedException != null);
+    Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
+    Assert.assertEquals("Exception msg didn't match", 
+      "Aborting [txnid:2,2] due to a write conflict on default/tab_part committed by [txnid:1,2]",
+      expectedException.getCause().getMessage());
+  }
+  /**
+   * txns overlap, update same resource, simulate multi-stmt txn case
+   * Also tests that we kill txn when it tries to acquire lock if we already know it will not be committed
+   */
+  @Test
+  public void testWriteSetTracking4() throws Exception {
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+    CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+      "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    cpr = driver.run("create table if not exists TAB2 (a int, b int) partitioned by (p string) " +
+      "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    
+    txnMgr.openTxn("Long Running");
+    checkCmdOnDriver(driver.compileAndRespond("select a from  TAB_PART where p = 'blah'"));
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    //for some reason this just locks the table; if I alter table to add this partition, then 
+    //we end up locking both table and partition with share_read.  (Plan has 2 ReadEntities)...?
+    //same for other locks below
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    txnMgr2.openTxn("Short Running");
+    checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'blah'"));//no such partition
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "Short Running");
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));
+    //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+      "default", "tab2", Collections.EMPTY_LIST));
+    txnMgr2.commitTxn();
+    //Short Running updated nothing, so we expect 0 rows in WRITE_SET
+    Assert.assertEquals( 0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+
+    txnMgr2.openTxn("T3");
+    checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 7 where p = 'two'"));//pretend this partition exists
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "T3");
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));//since TAB2 is empty
+    //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(),
+      "default", "tab2", Collections.singletonList("p=two")));//simulate partition update
+    txnMgr2.commitTxn();
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+    
+    AcidWriteSetService houseKeeper = new AcidWriteSetService();
+    TestTxnCommands2.runHouseKeeperService(houseKeeper, conf);
+    //since T3 overlaps with Long Running (still open) GC does nothing
+    Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+    checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1"));//no rows match
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
+    //so generate empty Dyn Part call
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(),
+      "default", "tab2", Collections.EMPTY_LIST));     
+    txnMgr.commitTxn();
+
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 0, locks.size());
+    TestTxnCommands2.runHouseKeeperService(houseKeeper, conf);
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+  }
+  /**
+   * overlapping txns updating the same resource but 1st one rolls back; 2nd commits
+   * @throws Exception
+   */
+  @Test
+  public void testWriteSetTracking5() throws Exception {
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+    CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
+      "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+    txnMgr.openTxn("Known");
+    txnMgr2.openTxn("Unknown");
+    checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+    checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
+    ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
+    locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks.get(1));
+    txnMgr.rollbackTxn();
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+    txnMgr2.commitTxn();//since conflicting txn rolled back, commit succeeds
+    Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+  }
+  /**
+   * check that read query concurrent with txn works ok
+   */
+  @Test
+  public void testWriteSetTracking6() throws Exception {
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+    CommandProcessorResponse cpr = driver.run("create table if not exists TAB2(a int, b int) clustered " +
+      "by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.compileAndRespond("select * from TAB2 where a = 113"));
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "Works");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    txnMgr2.openTxn("Horton");
+    checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 101"));
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "Horton");
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", null, locks.get(1));
+    txnMgr2.commitTxn();//no conflict
+    Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks.get(0));
+    TestTxnCommands2.runHouseKeeperService(new AcidWriteSetService(), conf);
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+  }
+
+  /**
+   * 2 concurrent txns update different partitions of the same table and succeed
+   * @throws Exception
+   */
+  @Test
+  public void testWriteSetTracking7() throws Exception {
+    Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
+    CommandProcessorResponse cpr = driver.run("create table if not exists tab2 (a int, b int) " +
+      "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.run("insert into tab2 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+
+    //test with predicates such that partition pruning works
+    txnMgr2.openTxn("T2");
+    checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='two'"));
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks.get(0));
+
+    //now start concurrent txn
+    txnMgr.openTxn("T3");
+    checkCmdOnDriver(driver.compileAndRespond("update tab2 set b = 7 where p='one'"));
+    ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(1));
+    
+    //this simulates the completion of txnid:2
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab2",
+      Collections.singletonList("p=two")));
+    txnMgr2.commitTxn();//txnid:2
+    
+    locks = getLocks(txnMgr2);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB2", "p=one", locks.get(0));
+    //completion of txnid:3
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab2",
+      Collections.singletonList("p=one")));
+    txnMgr.commitTxn();//txnid:3
+    //now both txns concurrently updated TAB2 but different partitions.
+    
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u'"));
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u'"));
+    //2 from txnid:1, 1 from txnid:2, 1 from txnid:3
+    Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+      4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab2' and ctc_partition is not null"));
+    
+    //================
+    //test with predicates such that partition pruning doesn't kick in
+    cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+      "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:4
+    txnMgr2.openTxn("T5");
+    checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "T5");
+    locks = getLocks(txnMgr2);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+    //now start concurrent txn
+    txnMgr.openTxn("T6");
+    checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b = 2"));
+    ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T6", false);
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 4, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=one", locks.get(3));
+
+    //this simulates the completion of txnid:5
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=one")));
+    txnMgr2.commitTxn();//txnid:5
+
+    ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+    //completion of txnid:6
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two")));
+    txnMgr.commitTxn();//txnid:6
+
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+    //2 from insert + 1 for each update stmt
+    Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+      4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+  }
+  /**
+   * Concurrent updates with partition pruning predicate and w/o one
+   */
+  @Test
+  public void testWriteSetTracking8() throws Exception {
+    CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+      "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    txnMgr2.openTxn("T2");
+    checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+    //now start concurrent txn
+    txnMgr.openTxn("T3");
+    checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where p='two'"));
+    ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 3, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+    //this simulates the completion of txnid:2
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=one")));
+    txnMgr2.commitTxn();//txnid:2
+
+    ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    //completion of txnid:3
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two")));
+    txnMgr.commitTxn();//txnid:3
+
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+    Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+      4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+  }
+  /**
+   * Concurrent update/delete of different partitions - should pass
+   */
+  @Test
+  public void testWriteSetTracking9() throws Exception {
+    CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+      "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    txnMgr2.openTxn("T2");
+    checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=1"));
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+    //now start concurrent txn
+    txnMgr.openTxn("T3");
+    checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+    ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 3, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+    //this simulates the completion of txnid:2
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=one")));
+    txnMgr2.commitTxn();//txnid:2
+
+    ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    //completion of txnid:3
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two")));
+    txnMgr.commitTxn();//txnid:3
+
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=one' and ws_operation_type='u' and ws_table='tab1'"));
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+    Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+      4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+  }
+  /**
+   * Concurrent update/delete of same partition - should fail to commit
+   */
+  @Test
+  public void testWriteSetTracking10() throws Exception {
+    CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+      "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    txnMgr2.openTxn("T2");
+    checkCmdOnDriver(driver.compileAndRespond("update tab1 set b = 7 where b=2"));
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+    //now start concurrent txn
+    txnMgr.openTxn("T3");
+    checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+    ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 3, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+    //this simulates the completion of txnid:2
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two")));
+    txnMgr2.commitTxn();//txnid:2
+
+    ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    //completion of txnid:3
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two")));
+    LockException exception = null;
+    try {
+      txnMgr.commitTxn();//txnid:3
+    }
+    catch(LockException e) {
+      exception = e;
+    }
+    Assert.assertNotEquals("Expected exception", null, exception);
+    Assert.assertEquals("Exception msg doesn't match",
+      "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]",
+      exception.getCause().getMessage());
+
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='u' and ws_table='tab1'"));
+    Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+      3, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+  }
+  /**
+   * Concurrent delte/detele of same partition - should pass
+   * This test doesn't work yet, because we don't yet pass in operation type
+   * 
+   * todo: Concurrent insert/update of same partition - should pass
+   */
+  @Ignore("HIVE-13622")
+  @Test
+  public void testWriteSetTracking11() throws Exception {
+    CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
+      "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.run("insert into tab1 partition(p)(a,b,p) values(1,1,'one'),(2,2,'two')"));//txnid:1
+    HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    txnMgr2.openTxn("T2");
+    checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where b=2"));
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr2);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+
+    //now start concurrent txn
+    txnMgr.openTxn("T3");
+    checkCmdOnDriver(driver.compileAndRespond("delete from tab1 where p='two' and b=2"));
+    ((DbTxnManager)txnMgr).acquireLocks(driver.getPlan(), ctx, "T3", false);
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 3, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=one", locks.get(1));
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB1", "p=two", locks.get(2));
+
+    //this simulates the completion of txnid:2
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two")));
+    txnMgr2.commitTxn();//txnid:2
+
+    ((DbLockManager)txnMgr.getLockManager()).checkLock(locks.get(2).getLockid());//retest WAITING locks (both have same ext id)
+    locks = getLocks(txnMgr);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB1", "p=two", locks.get(0));
+    //completion of txnid:3
+    txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnMgr.getCurrentTxnId(), "default", "tab1",
+      Collections.singletonList("p=two")));
+    LockException exception = null;
+    try {
+      txnMgr.commitTxn();//txnid:3
+    }
+    catch(LockException e) {
+      exception = e;
+    }
+    Assert.assertNotEquals("Expected exception", null, exception);
+    Assert.assertEquals("Exception msg doesn't match",
+      "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]",
+      exception.getCause().getMessage());
+
+    //todo: this currently fails since we don't yet set operation type properly
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+    Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_partition='p=two' and ws_operation_type='d' and ws_table='tab1'"));
+    Assert.assertEquals("COMPLETED_TXN_COMPONENTS mismatch: " + TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+      4, TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_table='tab1' and ctc_partition is not null"));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index a247065..1578bfb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hive.metastore.api.LockLevel;
 import org.apache.hadoop.hive.metastore.api.LockRequest;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
@@ -261,6 +263,8 @@ public class TestCleaner extends CompactorTest {
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
+    OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(1, "Dracula", "Transylvania"));
+    req.setTxnid(resp.getTxn_ids().get(0));
     LockResponse res = txnHandler.lock(req);
 
     startCleaner();


[08/13] hive git commit: HIVE-13656 : need to set direct memory limit higher in LlapServiceDriver for certain edge case configurations (Sergey Shelukhin, reviewed by Vikram Dixit K and Siddharth Seth)

Posted by jd...@apache.org.
HIVE-13656 : need to set direct memory limit higher in LlapServiceDriver for certain edge case configurations (Sergey Shelukhin, reviewed by Vikram Dixit K and Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3517a99e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3517a99e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3517a99e

Branch: refs/heads/llap
Commit: 3517a99edde061596d62b41339bacb5aac0e8290
Parents: eb2c54b
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu May 5 17:01:47 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu May 5 17:02:36 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/cli/LlapServiceDriver.java | 21 +++++++++++---------
 llap-server/src/main/resources/package.py       |  6 +++++-
 2 files changed, 17 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3517a99e/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index de6d9b8..006f70f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -236,20 +236,22 @@ public class LlapServiceDriver {
           String.valueOf(options.getIoThreads()));
     }
 
+    long cache = -1, xmx = -1;
     if (options.getCache() != -1) {
-      conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
-          Long.toString(options.getCache()));
+      cache = options.getCache();
+      conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache));
       propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
-          Long.toString(options.getCache()));
+          Long.toString(cache));
     }
 
     if (options.getXmx() != -1) {
       // Needs more explanation here
-      // Xmx is not the max heap value in JDK8
-      // You need to subtract 50% of the survivor fraction from this, to get actual usable memory before it goes into GC
-      long xmx = (long) (options.getXmx() / (1024 * 1024));
+      // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor fraction
+      // from this, to get actual usable  memory before it goes into GC
+      xmx = (long) (options.getXmx() / (1024 * 1024));
       conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmx);
-      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, String.valueOf(xmx));
+      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
+          String.valueOf(xmx));
     }
 
     if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty()) {
@@ -258,8 +260,6 @@ public class LlapServiceDriver {
           .setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
     }
 
-
-
     URL logger = conf.getResource(LlapDaemon.LOG4j2_PROPERTIES_FILE);
 
     if (null == logger) {
@@ -460,6 +460,9 @@ public class LlapServiceDriver {
     configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
         conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1));
 
+    long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long)(cache * 1.25) : -1;
+    configs.put("max_direct_memory", Long.toString(maxDirect));
+
     FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json"));
     OutputStreamWriter w = new OutputStreamWriter(os);
     configs.write(w);

http://git-wip-us.apache.org/repos/asf/hive/blob/3517a99e/llap-server/src/main/resources/package.py
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/package.py b/llap-server/src/main/resources/package.py
index 63c0ef1..94c9d1a 100644
--- a/llap-server/src/main/resources/package.py
+++ b/llap-server/src/main/resources/package.py
@@ -101,6 +101,10 @@ def main(args):
 		return
 	config = json_parse(open(join(input, "config.json")).read())
 	java_home = config["java.home"]
+	max_direct_memory = config["max_direct_memory"]
+	daemon_args = args.args
+	if max_direct_memory > 0:
+		daemon_args = " -XX:MaxDirectMemorySize=%s %s" % (max_direct_memory, daemon_args)
 	resource = LlapResource(config)
 	# 5% container failure every monkey_interval seconds
 	monkey_percentage = 5 # 5%
@@ -114,7 +118,7 @@ def main(args):
 		"hadoop_home" : os.getenv("HADOOP_HOME"),
 		"java_home" : java_home,
 		"name" : resource.clusterName,
-		"daemon_args" : args.args,
+		"daemon_args" : daemon_args,
 		"daemon_loglevel" : args.loglevel,
 		"queue.string" : resource.queueString,
 		"monkey_interval" : args.chaosmonkey,


[04/13] hive git commit: HIVE-13637: Fold CASE into NVL when CBO optimized the plan (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

Posted by jd...@apache.org.
HIVE-13637: Fold CASE into NVL when CBO optimized the plan (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/da82819b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/da82819b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/da82819b

Branch: refs/heads/llap
Commit: da82819bc112589e0d96874947c942e834681ed2
Parents: 10d0549
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Wed May 4 01:27:30 2016 +0100
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Thu May 5 22:13:10 2016 +0100

----------------------------------------------------------------------
 .../calcite/translator/JoinTypeCheckCtx.java    |  2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 17 ++++++++-----
 .../hadoop/hive/ql/parse/TypeCheckCtx.java      | 19 +++++++++-----
 .../hive/ql/parse/TypeCheckProcFactory.java     | 26 ++++++++++++++++++++
 .../queries/clientpositive/constantPropWhen.q   |  2 ++
 5 files changed, 53 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/da82819b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/JoinTypeCheckCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/JoinTypeCheckCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/JoinTypeCheckCtx.java
index dccd1d9..f166bb6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/JoinTypeCheckCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/JoinTypeCheckCtx.java
@@ -53,7 +53,7 @@ public class JoinTypeCheckCtx extends TypeCheckCtx {
 
   public JoinTypeCheckCtx(RowResolver leftRR, RowResolver rightRR, JoinType hiveJoinType)
       throws SemanticException {
-    super(RowResolver.getCombinedRR(leftRR, rightRR), true, false, false, false, false, false, false,
+    super(RowResolver.getCombinedRR(leftRR, rightRR), true, false, false, false, false, false, false, false,
         false, false);
     this.inputRRLst = ImmutableList.of(leftRR, rightRR);
     this.outerJoin = (hiveJoinType == JoinType.LEFTOUTER) || (hiveJoinType == JoinType.RIGHTOUTER)

http://git-wip-us.apache.org/repos/asf/hive/blob/da82819b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 2983d38..f79a525 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -3143,8 +3143,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     OpParseContext inputCtx = opParseCtx.get(input);
     RowResolver inputRR = inputCtx.getRowResolver();
     Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
-        new FilterDesc(genExprNodeDesc(condn, inputRR, useCaching), false), new RowSchema(
-            inputRR.getColumnInfos()), input), inputRR);
+        new FilterDesc(genExprNodeDesc(condn, inputRR, useCaching, isCBOExecuted()), false),
+        new RowSchema(inputRR.getColumnInfos()), input), inputRR);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created Filter Plan for " + qb.getId() + " row schema: "
@@ -4146,7 +4146,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
              expr, col_list, null, inputRR, starRR, pos, out_rwsch, qb.getAliases(), false);
       } else {
         // Case when this is an expression
-        TypeCheckCtx tcCtx = new TypeCheckCtx(inputRR);
+        TypeCheckCtx tcCtx = new TypeCheckCtx(inputRR, true, isCBOExecuted());
         // We allow stateful functions in the SELECT list (but nowhere else)
         tcCtx.setAllowStatefulFunctions(true);
         tcCtx.setAllowDistinctFunctions(false);
@@ -7777,7 +7777,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       List<ASTNode> expressions = joinTree.getExpressions().get(i);
       joinKeys[i] = new ExprNodeDesc[expressions.size()];
       for (int j = 0; j < joinKeys[i].length; j++) {
-        joinKeys[i][j] = genExprNodeDesc(expressions.get(j), inputRR);
+        joinKeys[i][j] = genExprNodeDesc(expressions.get(j), inputRR, true, isCBOExecuted());
       }
     }
     // Type checking and implicit type conversion for join keys
@@ -10999,12 +10999,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       throws SemanticException {
     // Since the user didn't supply a customized type-checking context,
     // use default settings.
-    return genExprNodeDesc(expr, input, true);
+    return genExprNodeDesc(expr, input, true, false);
   }
 
   public ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input, boolean useCaching)
       throws SemanticException {
-    TypeCheckCtx tcCtx = new TypeCheckCtx(input, useCaching);
+    return genExprNodeDesc(expr, input, useCaching, false);
+  }
+
+  public ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input, boolean useCaching,
+      boolean foldExpr) throws SemanticException {
+    TypeCheckCtx tcCtx = new TypeCheckCtx(input, useCaching, foldExpr);
     return genExprNodeDesc(expr, input, tcCtx);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/da82819b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
index de1c043..02896ff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
@@ -37,6 +37,8 @@ public class TypeCheckCtx implements NodeProcessorCtx {
 
   private final boolean useCaching;
 
+  private final boolean foldExpr;
+
   /**
    * Receives translations which will need to be applied during unparse.
    */
@@ -79,20 +81,21 @@ public class TypeCheckCtx implements NodeProcessorCtx {
    *          The input row resolver of the previous operator.
    */
   public TypeCheckCtx(RowResolver inputRR) {
-    this(inputRR, true);
+    this(inputRR, true, false);
   }
 
-  public TypeCheckCtx(RowResolver inputRR, boolean useCaching) {
-    this(inputRR, useCaching, false, true, true, true, true, true, true, true);
+  public TypeCheckCtx(RowResolver inputRR, boolean useCaching, boolean foldExpr) {
+    this(inputRR, useCaching, foldExpr, false, true, true, true, true, true, true, true);
   }
 
-  public TypeCheckCtx(RowResolver inputRR, boolean useCaching, boolean allowStatefulFunctions,
-      boolean allowDistinctFunctions, boolean allowGBExprElimination, boolean allowAllColRef,
-      boolean allowFunctionStar, boolean allowWindowing,
+  public TypeCheckCtx(RowResolver inputRR, boolean useCaching, boolean foldExpr,
+      boolean allowStatefulFunctions, boolean allowDistinctFunctions, boolean allowGBExprElimination,
+      boolean allowAllColRef, boolean allowFunctionStar, boolean allowWindowing,
       boolean allowIndexExpr, boolean allowSubQueryExpr) {
     setInputRR(inputRR);
     error = null;
     this.useCaching = useCaching;
+    this.foldExpr = foldExpr;
     this.allowStatefulFunctions = allowStatefulFunctions;
     this.allowDistinctFunctions = allowDistinctFunctions;
     this.allowGBExprElimination = allowGBExprElimination;
@@ -209,4 +212,8 @@ public class TypeCheckCtx implements NodeProcessorCtx {
   public boolean isUseCaching() {
     return useCaching;
   }
+
+  public boolean isFoldExpr() {
+    return foldExpr;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/da82819b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
index da236d5..ceeb9b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
@@ -61,9 +61,12 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.udf.SettableUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFNvl;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFWhen;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -1055,6 +1058,14 @@ public class TypeCheckProcFactory {
           }
           desc = ExprNodeGenericFuncDesc.newInstance(genericUDF, funcText,
               childrenList);
+        } else if (ctx.isFoldExpr() && canConvertIntoNvl(genericUDF, children)) {
+          // Rewrite CASE into NVL
+          desc = ExprNodeGenericFuncDesc.newInstance(new GenericUDFNvl(),
+                  Lists.newArrayList(children.get(0), new ExprNodeConstantDesc(false)));
+          if (Boolean.FALSE.equals(((ExprNodeConstantDesc) children.get(1)).getValue())) {
+            desc = ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPNot(),
+                    Lists.newArrayList(desc));
+          }
         } else {
           desc = ExprNodeGenericFuncDesc.newInstance(genericUDF, funcText,
               children);
@@ -1072,6 +1083,21 @@ public class TypeCheckProcFactory {
       return desc;
     }
 
+    private boolean canConvertIntoNvl(GenericUDF genericUDF, ArrayList<ExprNodeDesc> children) {
+      if (genericUDF instanceof GenericUDFWhen && children.size() == 3 &&
+              children.get(1) instanceof ExprNodeConstantDesc &&
+              children.get(2) instanceof ExprNodeConstantDesc) {
+        ExprNodeConstantDesc constThen = (ExprNodeConstantDesc) children.get(1);
+        ExprNodeConstantDesc constElse = (ExprNodeConstantDesc) children.get(2);
+        Object thenVal = constThen.getValue();
+        Object elseVal = constElse.getValue();
+        if (thenVal instanceof Boolean && elseVal instanceof Boolean) {
+          return true;
+        }
+      }
+      return false;
+    }
+
     /**
      * Returns true if des is a descendant of ans (ancestor)
      */

http://git-wip-us.apache.org/repos/asf/hive/blob/da82819b/ql/src/test/queries/clientpositive/constantPropWhen.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/constantPropWhen.q b/ql/src/test/queries/clientpositive/constantPropWhen.q
index c1d4885..03bfd54 100644
--- a/ql/src/test/queries/clientpositive/constantPropWhen.q
+++ b/ql/src/test/queries/clientpositive/constantPropWhen.q
@@ -1,4 +1,5 @@
 set hive.mapred.mode=nonstrict;
+set hive.optimize.constant.propagation=false;
 
 drop table test_1; 
 
@@ -24,6 +25,7 @@ SELECT cast(CASE id when id2 THEN TRUE ELSE FALSE END AS BOOLEAN) AS b FROM test
 
 
 set hive.cbo.enable=false;
+set hive.optimize.constant.propagation=true;
 
 explain SELECT cast(CASE WHEN id = id2 THEN FALSE ELSE TRUE END AS BOOLEAN) AS b FROM test_1; 
 


[12/13] hive git commit: HIVE-13507: Improved logging for ptest (Siddharth Seth, reviewd by Szehon Ho)

Posted by jd...@apache.org.
HIVE-13507: Improved logging for ptest (Siddharth Seth, reviewd by Szehon Ho)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3f07bfce
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3f07bfce
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3f07bfce

Branch: refs/heads/llap
Commit: 3f07bfcefce775dc77eca13cf623ccde94ff2494
Parents: 2b1e273
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 6 10:06:25 2016 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Fri May 6 10:06:25 2016 -0500

----------------------------------------------------------------------
 .../hive/ptest/execution/ExecutionPhase.java    |  2 +
 .../hive/ptest/execution/HostExecutor.java      | 48 ++++++++++++++++++--
 .../hive/ptest/execution/LocalCommand.java      | 31 +++++++++++--
 .../apache/hive/ptest/execution/PrepPhase.java  |  1 +
 .../apache/hive/ptest/execution/conf/Host.java  |  3 ++
 5 files changed, 76 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3f07bfce/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java
----------------------------------------------------------------------
diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java
index 3026ea0..6063afc 100644
--- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java
+++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/ExecutionPhase.java
@@ -86,6 +86,8 @@ public class ExecutionPhase extends Phase {
         isolatedWorkQueue.add(batch);
       }
     }
+    logger.info("ParallelWorkQueueSize={}, IsolatedWorkQueueSize={}", parallelWorkQueue.size(),
+        isolatedWorkQueue.size());
     try {
       int expectedNumHosts = hostExecutors.size();
       initalizeHosts();

http://git-wip-us.apache.org/repos/asf/hive/blob/3f07bfce/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java
----------------------------------------------------------------------
diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java
index b05d2c2..735b261 100644
--- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java
+++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/HostExecutor.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Stopwatch;
 import org.apache.hive.ptest.execution.conf.Host;
 import org.apache.hive.ptest.execution.conf.TestBatch;
 import org.apache.hive.ptest.execution.ssh.RSyncCommand;
@@ -65,6 +66,8 @@ class HostExecutor {
   private final File mFailedTestLogDir;
   private final long mNumPollSeconds;
   private volatile boolean mShutdown;
+  private int numParallelBatchesProcessed = 0;
+  private int numIsolatedBatchesProcessed = 0;
   
   HostExecutor(Host host, String privateKey, ListeningExecutorService executor,
       SSHCommandExecutor sshCommandExecutor,
@@ -100,7 +103,18 @@ class HostExecutor {
     return mExecutor.submit(new Callable<Void>() {
       @Override
       public Void call() throws Exception {
-        executeTests(parallelWorkQueue, isolatedWorkQueue, failedTestResults);
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        mLogger.info("Starting SubmitTests on host {}", getHost());
+        try {
+          executeTests(parallelWorkQueue, isolatedWorkQueue, failedTestResults);
+        } finally {
+          stopwatch.stop();
+          mLogger.info("Finishing submitTests on host: {}. ElapsedTime(seconds)={}," +
+              " NumParallelBatchesProcessed={}, NumIsolatedBatchesProcessed={}",
+              new Object[]{getHost().toString(),
+                  stopwatch.elapsed(TimeUnit.SECONDS), numParallelBatchesProcessed,
+                  numIsolatedBatchesProcessed});
+        }
         return null;
       }
 
@@ -143,6 +157,7 @@ class HostExecutor {
         @Override
         public Void call() throws Exception {
           TestBatch batch = null;
+          Stopwatch sw = Stopwatch.createUnstarted();
           try {
             do {
               batch = parallelWorkQueue.poll(mNumPollSeconds, TimeUnit.SECONDS);
@@ -151,8 +166,16 @@ class HostExecutor {
                 return null;
               }
               if(batch != null) {
-                if(!executeTestBatch(drone, batch, failedTestResults)) {
-                  failedTestResults.add(batch);
+                numParallelBatchesProcessed++;
+                sw.reset().start();
+                try {
+                  if (!executeTestBatch(drone, batch, failedTestResults)) {
+                    failedTestResults.add(batch);
+                  }
+                } finally {
+                  sw.stop();
+                  mLogger.info("Finished processing parallel batch [{}] on host {}. ElapsedTime(seconds)={}",
+                      new Object[]{batch.getName(), getHost().toShortString(), sw.elapsed(TimeUnit.SECONDS)});
                 }
               }
             } while(!mShutdown && !parallelWorkQueue.isEmpty());
@@ -176,12 +199,22 @@ class HostExecutor {
     mLogger.info("Starting isolated execution on " + mHost.getName());
     for(Drone drone : ImmutableList.copyOf(mDrones)) {
       TestBatch batch = null;
+      Stopwatch sw = Stopwatch.createUnstarted();
       try {
         do {
+
           batch = isolatedWorkQueue.poll(mNumPollSeconds, TimeUnit.SECONDS);
           if(batch != null) {
-            if(!executeTestBatch(drone, batch, failedTestResults)) {
-              failedTestResults.add(batch);
+            numIsolatedBatchesProcessed++;
+            sw.reset().start();
+            try {
+              if (!executeTestBatch(drone, batch, failedTestResults)) {
+                failedTestResults.add(batch);
+              }
+            } finally {
+              sw.stop();
+              mLogger.info("Finished processing isolated batch [{}] on host {}. ElapsedTime(seconds)={}",
+                  new Object[]{batch.getName(), getHost().toShortString(), sw.elapsed(TimeUnit.SECONDS)});
             }
           }
         } while(!mShutdown && !isolatedWorkQueue.isEmpty());
@@ -215,10 +248,15 @@ class HostExecutor {
     Templates.writeTemplateResult("batch-exec.vm", script, templateVariables);
     copyToDroneFromLocal(drone, script.getAbsolutePath(), "$localDir/$instanceName/scratch/" + scriptName);
     script.delete();
+    Stopwatch sw = Stopwatch.createStarted();
     mLogger.info(drone + " executing " + batch + " with " + command);
     RemoteCommandResult sshResult = new SSHCommand(mSSHCommandExecutor, drone.getPrivateKey(), drone.getUser(),
         drone.getHost(), drone.getInstance(), command, true).
         call();
+    sw.stop();
+    mLogger.info("Completed executing tests for batch [{}] on host {}. ElapsedTime(seconds)={}",
+        new Object[]{batch.getName(),
+            getHost().toShortString(), sw.elapsed(TimeUnit.SECONDS)});
     File batchLogDir = null;
     if(sshResult.getExitCode() == Constants.EXIT_CODE_UNKNOWN) {
       throw new AbortDroneException("Drone " + drone.toString() + " exited with " +

http://git-wip-us.apache.org/repos/asf/hive/blob/3f07bfce/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LocalCommand.java
----------------------------------------------------------------------
diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LocalCommand.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LocalCommand.java
index ec99656..de9fe68 100644
--- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LocalCommand.java
+++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/LocalCommand.java
@@ -22,17 +22,28 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 
 public class LocalCommand {
 
+  private static final AtomicInteger localCommandCounter = new AtomicInteger(0);
+
+  private final Logger logger;
   private final Process process;
   private final StreamReader streamReader;
   private Integer exitCode;
+  private final int commandId;
+  private final Stopwatch stopwatch = Stopwatch.createUnstarted();
 
   public LocalCommand(Logger logger, OutputPolicy outputPolicy, String command) throws IOException {
-    logger.info("Starting " + command);
+    this.commandId = localCommandCounter.incrementAndGet();
+    this.logger = logger;
+    logger.info("Starting LocalCommandId={}: {}" + commandId, command);
+    stopwatch.start();
     process = new ProcessBuilder().command(new String[] {"bash", "-c", command}).redirectErrorStream(true).start();
     streamReader = new StreamReader(outputPolicy, process.getInputStream());
     streamReader.setName("StreamReader-[" + command + "]");
@@ -42,13 +53,25 @@ public class LocalCommand {
 
   public int getExitCode() throws InterruptedException {
     synchronized (process) {
-      if(exitCode == null) {
+      awaitProcessCompletion();
+      return exitCode;
+    }
+  }
+
+  private void awaitProcessCompletion() throws InterruptedException {
+    synchronized (process) {
+      if (exitCode == null) {
         exitCode = process.waitFor();
+        if (stopwatch.isRunning()) {
+          stopwatch.stop();
+          logger.info("Finished LocalCommandId={}. ElapsedTime(seconds)={}", commandId,
+              stopwatch.elapsed(
+                  TimeUnit.SECONDS));
+        }
       }
-      return exitCode;
     }
   }
-  
+
   public void kill() {
     synchronized (process) {
       process.destroy();

http://git-wip-us.apache.org/repos/asf/hive/blob/3f07bfce/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PrepPhase.java
----------------------------------------------------------------------
diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PrepPhase.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PrepPhase.java
index 825f0c0..8fef413 100644
--- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PrepPhase.java
+++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/PrepPhase.java
@@ -62,6 +62,7 @@ public class PrepPhase extends Phase {
     // source prep
     start = System.currentTimeMillis();
     File sourcePrepScript = new File(mScratchDir, "source-prep.sh");
+    logger.info("Writing {} from template", sourcePrepScript);
     Templates.writeTemplateResult("source-prep.vm", sourcePrepScript, getTemplateDefaults());
     execLocally("bash " + sourcePrepScript.getPath());
     logger.debug("Deleting " + sourcePrepScript + ": " + sourcePrepScript.delete());

http://git-wip-us.apache.org/repos/asf/hive/blob/3f07bfce/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/Host.java
----------------------------------------------------------------------
diff --git a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/Host.java b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/Host.java
index c1216c1..a56824c 100644
--- a/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/Host.java
+++ b/testutils/ptest2/src/main/java/org/apache/hive/ptest/execution/conf/Host.java
@@ -47,6 +47,9 @@ public class Host {
   public String[] getLocalDirectories() {
     return localDirectories;
   }
+  public String toShortString() {
+    return name;
+  }
   @Override
   public String toString() {
     return "Host [name=" + name + ", user=" + user + ", threads=" + threads


[11/13] hive git commit: HIVE-13679: Pass diagnostic message to failure hooks (Jimmy Xiang, reviewed by Aihua Xu)

Posted by jd...@apache.org.
HIVE-13679: Pass diagnostic message to failure hooks (Jimmy Xiang, reviewed by Aihua Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2b1e273e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2b1e273e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2b1e273e

Branch: refs/heads/llap
Commit: 2b1e273e44fe367c12167409e8552efa2770ae7e
Parents: b870d52
Author: Jimmy Xiang <jx...@apache.org>
Authored: Tue May 3 14:48:09 2016 -0700
Committer: Jimmy Xiang <jx...@apache.org>
Committed: Fri May 6 07:41:43 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 11 ++++++----
 .../org/apache/hadoop/hive/ql/exec/Task.java    | 21 ++++++++++++++++----
 .../apache/hadoop/hive/ql/exec/TaskResult.java  |  7 +++++--
 .../apache/hadoop/hive/ql/exec/TaskRunner.java  |  5 ++++-
 .../hive/ql/exec/mr/HadoopJobExecHelper.java    |  1 +
 .../hadoop/hive/ql/exec/mr/JobDebugger.java     | 18 +++++++++++------
 6 files changed, 46 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 6a610cb..3fecc5c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -118,6 +118,7 @@ import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 
@@ -1598,7 +1599,8 @@ public class Driver implements CommandProcessor {
 
           } else {
             setErrorMsgAndDetail(exitVal, result.getTaskError(), tsk);
-            invokeFailureHooks(perfLogger, hookContext, result.getTaskError());
+            invokeFailureHooks(perfLogger, hookContext,
+              errorMessage + Strings.nullToEmpty(tsk.getDiagnosticsMessage()), result.getTaskError());
             SQLState = "08S01";
             console.printError(errorMessage);
             driverCxt.shutdown();
@@ -1634,7 +1636,7 @@ public class Driver implements CommandProcessor {
       if (driverCxt.isShutdown()) {
         SQLState = "HY008";
         errorMessage = "FAILED: Operation cancelled";
-        invokeFailureHooks(perfLogger, hookContext, null);
+        invokeFailureHooks(perfLogger, hookContext, errorMessage, null);
         console.printError(errorMessage);
         return 1000;
       }
@@ -1691,7 +1693,7 @@ public class Driver implements CommandProcessor {
       errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
       if (hookContext != null) {
         try {
-          invokeFailureHooks(perfLogger, hookContext, e);
+          invokeFailureHooks(perfLogger, hookContext, errorMessage, e);
         } catch (Exception t) {
           LOG.warn("Failed to invoke failure hook", t);
         }
@@ -1790,7 +1792,8 @@ public class Driver implements CommandProcessor {
     }
   }
 
-  private void invokeFailureHooks(PerfLogger perfLogger, HookContext hookContext, Throwable exception) throws Exception {
+  private void invokeFailureHooks(PerfLogger perfLogger,
+      HookContext hookContext, String errorMessage, Throwable exception) throws Exception {
     hookContext.setHookType(HookContext.HookType.ON_FAILURE_HOOK);
     hookContext.setErrorMessage(errorMessage);
     hookContext.setException(exception);

http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 34bdafd..eeaa543 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -27,10 +27,12 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.*;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryDisplay;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -40,6 +42,8 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Task implementation.
@@ -84,8 +88,17 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
   protected T work;
   private TaskState taskState = TaskState.CREATED;
   private String statusMessage;
+  private String diagnosticMesg;
   private transient boolean fetchSource;
 
+  public void setDiagnosticMessage(String diagnosticMesg) {
+    this.diagnosticMesg = diagnosticMesg;
+  }
+
+  public String getDiagnosticsMessage() {
+    return diagnosticMesg;
+  }
+
   public void setStatusMessage(String statusMessage) {
     this.statusMessage = statusMessage;
     updateStatusInQueryDisplay();
@@ -321,7 +334,7 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
     return ret;
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({"unchecked", "rawtypes"})
   public static List<Task<? extends Serializable>>
       findLeafs(List<Task<? extends Serializable>> rootTasks) {
     final List<Task<? extends Serializable>> leafTasks = new ArrayList<Task<?>>();

http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java
index def9389..3c4ee17 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java
@@ -37,10 +37,13 @@ public class TaskResult {
     this.exitVal = exitVal;
     setRunning(false);
   }
-  public void setExitVal(int exitVal, Throwable taskError) {
-    this.setExitVal(exitVal);
+  public void setTaskError(Throwable taskError) {
     this.taskError = taskError;
   }
+  public void setExitVal(int exitVal, Throwable taskError) {
+    setExitVal(exitVal);
+    setTaskError(taskError);
+  }
 
   public int getExitVal() {
     return exitVal;

http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
index 81f6db0..a596e92 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
@@ -104,7 +104,10 @@ public class TaskRunner extends Thread {
       }
       LOG.error("Error in executeTask", t);
     }
-    result.setExitVal(exitVal, tsk.getException());
+    result.setExitVal(exitVal);
+    if (tsk.getException() != null) {
+      result.setTaskError(tsk.getException());
+    }
   }
 
   public static long getTaskRunnerID () {

http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index 11f5cfd..c15316bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -574,6 +574,7 @@ public class HadoopJobExecHelper {
           Thread t = new Thread(jd);
           t.start();
           t.join(HiveConf.getIntVar(job, HiveConf.ConfVars.JOB_DEBUG_TIMEOUT));
+          task.setDiagnosticMessage(jd.getDiagnosticMesg());
           int ec = jd.getErrorCode();
           if (ec > 0) {
             returnVal = ec;

http://git-wip-us.apache.org/repos/asf/hive/blob/2b1e273e/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java
index 6e4e3bf..d320536 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java
@@ -53,6 +53,7 @@ public class JobDebugger implements Runnable {
   private final Map<String, Integer> failures = new HashMap<String, Integer>();
   private final Set<String> successes = new HashSet<String>(); // Successful task ID's
   private final Map<String, TaskInfo> taskIdToInfo = new HashMap<String, TaskInfo>();
+  private String diagnosticMesg;
   private int maxFailures = 0;
 
   // Used for showJobFailDebugInfo
@@ -115,7 +116,7 @@ public class JobDebugger implements Runnable {
 
   public void run() {
     try {
-      showJobFailDebugInfo();
+      diagnosticMesg = showJobFailDebugInfo();
     } catch (IOException e) {
       console.printError(e.getMessage());
     }
@@ -216,8 +217,7 @@ public class JobDebugger implements Runnable {
     }
   }
 
-  @SuppressWarnings("deprecation")
-  private void showJobFailDebugInfo() throws IOException {
+  private String showJobFailDebugInfo() throws IOException {
     console.printError("Error during job, obtaining debugging information...");
     if (!conf.get("mapred.job.tracker", "local").equals("local")) {
       // Show Tracking URL for remotely running jobs.
@@ -241,7 +241,7 @@ public class JobDebugger implements Runnable {
     }
 
     if (failures.keySet().size() == 0) {
-      return;
+      return null;
     }
     // Find the highest failure count
     computeMaxFailures() ;
@@ -255,6 +255,7 @@ public class JobDebugger implements Runnable {
           + e.getMessage());
     }
 
+    String msg = null;
     for (String task : failures.keySet()) {
       if (failures.get(task).intValue() == maxFailures) {
         TaskInfo ti = taskIdToInfo.get(task);
@@ -303,14 +304,19 @@ public class JobDebugger implements Runnable {
           for (String mesg : diagMesgs) {
             sb.append(mesg + "\n");
           }
-          console.printError(sb.toString());
+          msg = sb.toString();
+          console.printError(msg);
         }
 
         // Only print out one task because that's good enough for debugging.
         break;
       }
     }
-    return;
+    return msg;
+  }
+
+  public String getDiagnosticMesg() {
+    return diagnosticMesg;
   }
 
   public int getErrorCode() {


[06/13] hive git commit: HIVE-13393: Beeline: Print help message for the --incremental option (Vaibhav Gumashta reviewed by Thejas Nair)

Posted by jd...@apache.org.
HIVE-13393: Beeline: Print help message for the --incremental option (Vaibhav Gumashta reviewed by Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/794f161c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/794f161c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/794f161c

Branch: refs/heads/llap
Commit: 794f161c136c4d99693eb60222c0f17b10948e0d
Parents: 4eb9603
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Thu May 5 15:12:38 2016 -0700
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Thu May 5 15:12:38 2016 -0700

----------------------------------------------------------------------
 beeline/src/main/resources/BeeLine.properties | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/794f161c/beeline/src/main/resources/BeeLine.properties
----------------------------------------------------------------------
diff --git a/beeline/src/main/resources/BeeLine.properties b/beeline/src/main/resources/BeeLine.properties
index a118c09..bc40685 100644
--- a/beeline/src/main/resources/BeeLine.properties
+++ b/beeline/src/main/resources/BeeLine.properties
@@ -171,7 +171,14 @@ cmd-usage: Usage: java org.apache.hive.cli.beeline.BeeLine \n \
 \  --silent=[true/false]           be more silent\n \
 \  --autosave=[true/false]         automatically save preferences\n \
 \  --outputformat=[table/vertical/csv2/tsv2/dsv/csv/tsv]  format mode for result display\n \
-\                                  Note that csv, and tsv are deprecated - use csv2, tsv2 instead\n\
+\                                  Note that csv, and tsv are deprecated - use csv2, tsv2 instead\n \
+\  --incremental=[true/false]      Defaults to false. When set to false, the entire result set\n \
+\                                  is fetched and buffered before being displayed, yielding optimal\n \
+\                                  display column sizing. When set to true, result rows are displayed\n \
+\                                  immediately as they are fetched, yielding lower latency and\n \
+\                                  memory usage at the price of extra display column padding.\n \
+\                                  Setting --incremental=true is recommended if you encounter an OutOfMemory\n \
+\                                  on the client side (due to the fetched result set size being large).\n \
 \  --truncateTable=[true/false]    truncate table column when it exceeds length\n \
 \  --delimiterForDSV=DELIMITER     specify the delimiter for delimiter-separated values output format (default: |)\n \
 \  --isolation=LEVEL               set the transaction isolation level\n \


[13/13] hive git commit: Merge branch 'master' into llap

Posted by jd...@apache.org.
Merge branch 'master' into llap


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f089f2e6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f089f2e6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f089f2e6

Branch: refs/heads/llap
Commit: f089f2e64241592ecf8144d044bec8a0659ff422
Parents: 89ec219 3f07bfc
Author: Jason Dere <jd...@hortonworks.com>
Authored: Fri May 6 10:14:21 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Fri May 6 10:14:21 2016 -0700

----------------------------------------------------------------------
 beeline/src/main/resources/BeeLine.properties   |   9 +-
 cli/pom.xml                                     |   6 +
 common/pom.xml                                  |   6 +
 .../org/apache/hadoop/hive/common/LogUtils.java |  35 +-
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   8 +
 .../src/main/resources/hive-log4j2.properties   |   2 +-
 .../hadoop/hive/conf/TestHiveAsyncLogging.java  |  49 ++
 data/conf/hive-log4j2.properties                |   2 +-
 hcatalog/core/pom.xml                           |   6 +
 .../hive/metastore/TestHiveMetaStoreTxns.java   |   2 +-
 llap-server/bin/llapDaemon.sh                   |   2 +-
 .../hadoop/hive/llap/cli/LlapServiceDriver.java |  21 +-
 .../hive/llap/daemon/impl/LlapDaemon.java       |  10 +-
 ...doop-metrics2-llapdaemon.properties.template |  50 ++
 ...trics2-llaptaskscheduler.properties.template |  50 ++
 .../hadoop-metrics2.properties.template         |  50 --
 .../main/resources/llap-cli-log4j2.properties   |   2 +-
 .../resources/llap-daemon-log4j2.properties     |   4 +-
 llap-server/src/main/resources/package.py       |   6 +-
 .../resources/llap-daemon-log4j2.properties     |   4 +-
 .../tezplugins/LlapTaskSchedulerService.java    |   2 +-
 .../metrics/LlapTaskSchedulerMetrics.java       |   6 +-
 metastore/pom.xml                               |   6 +
 .../upgrade/derby/035-HIVE-13395.derby.sql      |  11 +
 .../upgrade/derby/hive-schema-2.1.0.derby.sql   |   2 +-
 .../derby/hive-txn-schema-1.3.0.derby.sql       |  11 +-
 .../derby/hive-txn-schema-2.1.0.derby.sql       | 130 ++++
 .../derby/upgrade-1.2.0-to-1.3.0.derby.sql      |   1 +
 .../derby/upgrade-2.0.0-to-2.1.0.derby.sql      |   1 +
 .../upgrade/mssql/020-HIVE-13395.mssql.sql      |   9 +
 .../upgrade/mssql/hive-schema-1.3.0.mssql.sql   |  12 +-
 .../upgrade/mssql/hive-schema-2.1.0.mssql.sql   |  12 +-
 .../mssql/upgrade-1.2.0-to-1.3.0.mssql.sql      |   1 +
 .../mssql/upgrade-2.0.0-to-2.1.0.mssql.sql      |   1 +
 .../upgrade/mysql/035-HIVE-13395.mysql.sql      |  10 +
 .../upgrade/mysql/hive-schema-2.1.0.mysql.sql   |   2 +-
 .../mysql/hive-txn-schema-1.3.0.mysql.sql       |  10 +
 .../mysql/hive-txn-schema-2.1.0.mysql.sql       | 131 ++++
 .../mysql/upgrade-1.2.0-to-1.3.0.mysql.sql      |   1 +
 .../mysql/upgrade-2.0.0-to-2.1.0.mysql.sql      |   1 +
 .../upgrade/oracle/035-HIVE-13395.oracle.sql    |  10 +
 .../upgrade/oracle/hive-schema-2.1.0.oracle.sql |   2 +-
 .../oracle/hive-txn-schema-1.3.0.oracle.sql     |  12 +-
 .../oracle/hive-txn-schema-2.1.0.oracle.sql     | 129 ++++
 .../oracle/upgrade-1.2.0-to-1.3.0.oracle.sql    |   1 +
 .../oracle/upgrade-2.0.0-to-2.1.0.oracle.sql    |   1 +
 .../postgres/034-HIVE-13395.postgres.sql        |  10 +
 .../postgres/hive-schema-2.1.0.postgres.sql     |   2 +-
 .../postgres/hive-txn-schema-1.3.0.postgres.sql |  11 +-
 .../postgres/hive-txn-schema-2.1.0.postgres.sql | 129 ++++
 .../upgrade-1.2.0-to-1.3.0.postgres.sql         |   1 +
 .../upgrade-2.0.0-to-2.1.0.postgres.sql         |   1 +
 .../hadoop/hive/metastore/HiveMetaStore.java    |   1 +
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    | 130 ++--
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 466 +++++++++++---
 .../hadoop/hive/metastore/txn/TxnStore.java     |   8 +-
 .../hadoop/hive/metastore/txn/TxnUtils.java     |   2 +
 .../metastore/txn/TestCompactionTxnHandler.java |   6 +-
 .../hive/metastore/txn/TestTxnHandler.java      |  29 +-
 pom.xml                                         |   2 +
 ql/pom.xml                                      |   6 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  11 +-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |   2 +-
 .../hadoop/hive/ql/exec/OperatorUtils.java      |   2 +-
 .../org/apache/hadoop/hive/ql/exec/Task.java    |  21 +-
 .../apache/hadoop/hive/ql/exec/TaskResult.java  |   7 +-
 .../apache/hadoop/hive/ql/exec/TaskRunner.java  |   5 +-
 .../hive/ql/exec/mr/HadoopJobExecHelper.java    |   1 +
 .../hadoop/hive/ql/exec/mr/JobDebugger.java     |  18 +-
 .../hadoop/hive/ql/lockmgr/DbLockManager.java   |   5 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |  27 +-
 .../calcite/translator/JoinTypeCheckCtx.java    |   2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  17 +-
 .../hadoop/hive/ql/parse/TypeCheckCtx.java      |  19 +-
 .../hive/ql/parse/TypeCheckProcFactory.java     |  26 +
 .../hadoop/hive/ql/txn/AcidWriteSetService.java |  61 ++
 .../txn/compactor/HouseKeeperServiceBase.java   |   2 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java |   2 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java    |   2 +-
 .../main/resources/hive-exec-log4j2.properties  |   2 +-
 .../resources/tez-container-log4j2.properties   |   2 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   2 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |  20 +
 .../hive/ql/lockmgr/TestDbTxnManager.java       |   7 +
 .../hive/ql/lockmgr/TestDbTxnManager2.java      | 610 ++++++++++++++++++-
 .../hive/ql/txn/compactor/TestCleaner.java      |   4 +
 .../queries/clientpositive/constantPropWhen.q   |   2 +
 .../hive/ptest/execution/ExecutionPhase.java    |   2 +
 .../hive/ptest/execution/HostExecutor.java      |  48 +-
 .../hive/ptest/execution/LocalCommand.java      |  31 +-
 .../apache/hive/ptest/execution/PrepPhase.java  |   1 +
 .../apache/hive/ptest/execution/conf/Host.java  |   3 +
 92 files changed, 2294 insertions(+), 313 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f089f2e6/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f089f2e6/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f089f2e6/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f089f2e6/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f089f2e6/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
----------------------------------------------------------------------


[10/13] hive git commit: HIVE-13027: Configuration changes to improve logging performance (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Posted by jd...@apache.org.
HIVE-13027: Configuration changes to improve logging performance (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b870d526
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b870d526
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b870d526

Branch: refs/heads/llap
Commit: b870d526edbac1831d66f2529cf1a854b57bddb2
Parents: 0cc4045
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri May 6 03:08:28 2016 -0500
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri May 6 03:09:40 2016 -0500

----------------------------------------------------------------------
 cli/pom.xml                                     |  6 +++
 common/pom.xml                                  |  6 +++
 .../org/apache/hadoop/hive/common/LogUtils.java | 35 ++++++++++++--
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  6 +++
 .../src/main/resources/hive-log4j2.properties   |  2 +-
 .../hadoop/hive/conf/TestHiveAsyncLogging.java  | 49 ++++++++++++++++++++
 data/conf/hive-log4j2.properties                |  2 +-
 hcatalog/core/pom.xml                           |  6 +++
 llap-server/bin/llapDaemon.sh                   |  2 +-
 .../hive/llap/daemon/impl/LlapDaemon.java       | 10 ++--
 .../main/resources/llap-cli-log4j2.properties   |  2 +-
 .../resources/llap-daemon-log4j2.properties     |  4 +-
 .../resources/llap-daemon-log4j2.properties     |  4 +-
 metastore/pom.xml                               |  6 +++
 pom.xml                                         |  2 +
 ql/pom.xml                                      |  6 +++
 .../main/resources/hive-exec-log4j2.properties  |  2 +-
 .../resources/tez-container-log4j2.properties   |  2 +-
 18 files changed, 136 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/cli/pom.xml
----------------------------------------------------------------------
diff --git a/cli/pom.xml b/cli/pom.xml
index 76f6d11..6f2e664 100644
--- a/cli/pom.xml
+++ b/cli/pom.xml
@@ -82,6 +82,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>com.lmax</groupId>
+      <artifactId>disruptor</artifactId>
+      <version>${disruptor.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>jline</groupId>
       <artifactId>jline</artifactId>
       <version>${jline.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 67aab7c..9933072 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -164,6 +164,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>com.lmax</groupId>
+      <artifactId>disruptor</artifactId>
+      <version>${disruptor.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.json</groupId>
       <artifactId>json</artifactId>
       <version>${json.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
index adcf805..599e798 100644
--- a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
@@ -21,12 +21,18 @@ package org.apache.hadoop.hive.common;
 import java.io.File;
 import java.net.URL;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configurator;
+import org.apache.logging.log4j.core.impl.Log4jContextFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Utilities common to logging operations.
  */
@@ -66,8 +72,14 @@ public class LogUtils {
   }
 
   private static String initHiveLog4jCommon(ConfVars confVarName)
-    throws LogInitializationException {
+      throws LogInitializationException {
     HiveConf conf = new HiveConf();
+    return initHiveLog4jCommon(conf, confVarName);
+  }
+
+  @VisibleForTesting
+  public static String initHiveLog4jCommon(HiveConf conf, ConfVars confVarName)
+    throws LogInitializationException {
     if (HiveConf.getVar(conf, confVarName).equals("")) {
       // if log4j configuration file not set, or could not found, use default setting
       return initHiveLog4jDefault(conf, "", confVarName);
@@ -91,13 +103,28 @@ public class LogUtils {
           }
           System.setProperty(HiveConf.ConfVars.HIVEQUERYID.toString(), queryId);
         }
+        final boolean async = checkAndSetAsyncLogging(conf);
         Configurator.initialize(null, log4jFileName);
         logConfigLocation(conf);
-        return ("Logging initialized using configuration in " + log4jConfigFile);
+        return "Logging initialized using configuration in " + log4jConfigFile + " Async: " + async;
       }
     }
   }
 
+  public static boolean checkAndSetAsyncLogging(final Configuration conf) {
+    final boolean asyncLogging = HiveConf.getBoolVar(conf, ConfVars.HIVE_ASYNC_LOG_ENABLED);
+    if (asyncLogging) {
+      System.setProperty("Log4jContextSelector",
+          "org.apache.logging.log4j.core.async.AsyncLoggerContextSelector");
+      // default is ClassLoaderContextSelector which is created during automatic logging
+      // initialization in a static initialization block.
+      // Changing ContextSelector at runtime requires creating new context factory which will
+      // internally create new context selector based on system property.
+      LogManager.setFactory(new Log4jContextFactory());
+    }
+    return asyncLogging;
+  }
+
   private static String initHiveLog4jDefault(
     HiveConf conf, String logMessage, ConfVars confVarName)
     throws LogInitializationException {
@@ -118,9 +145,11 @@ public class LogUtils {
         break;
     }
     if (hive_l4j != null) {
+      final boolean async = checkAndSetAsyncLogging(conf);
       Configurator.initialize(null, hive_l4j.toString());
       logConfigLocation(conf);
-      return (logMessage + "\n" + "Logging initialized using configuration in " + hive_l4j);
+      return (logMessage + "\n" + "Logging initialized using configuration in " + hive_l4j +
+          " Async: " + async);
     } else {
       throw new LogInitializationException(
         logMessage + "Unable to initialize logging using "

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bb74d99..07dff08 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1955,6 +1955,12 @@ public class HiveConf extends Configuration {
         "If the property is not set, then logging will be initialized using hive-exec-log4j2.properties found on the classpath.\n" +
         "If the property is set, the value must be a valid URI (java.net.URI, e.g. \"file:///tmp/my-logging.xml\"), \n" +
         "which you can then extract a URL from and pass to PropertyConfigurator.configure(URL)."),
+    HIVE_ASYNC_LOG_ENABLED("hive.async.log.enabled", true,
+        "Whether to enable Log4j2's asynchronous logging. Asynchronous logging can give\n" +
+        " significant performance improvement as logging will be handled in separate thread\n" +
+        " that uses LMAX disruptor queue for buffering log messages.\n" +
+        " Refer https://logging.apache.org/log4j/2.x/manual/async.html for benefits and\n" +
+        " drawbacks."),
 
     HIVE_LOG_EXPLAIN_OUTPUT("hive.log.explain.output", false,
         "Whether to log explain output for every query.\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/common/src/main/resources/hive-log4j2.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/hive-log4j2.properties b/common/src/main/resources/hive-log4j2.properties
index cf0369a..2f67be8 100644
--- a/common/src/main/resources/hive-log4j2.properties
+++ b/common/src/main/resources/hive-log4j2.properties
@@ -36,7 +36,7 @@ appender.console.layout.type = PatternLayout
 appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n
 
 # daily rolling file appender
-appender.DRFA.type = RollingFile
+appender.DRFA.type = RollingRandomAccessFile
 appender.DRFA.name = DRFA
 appender.DRFA.fileName = ${sys:hive.log.dir}/${sys:hive.log.file}
 # Use %pid in the filePattern to append <process-id>@<host-name> to the filename if you want separate log files for different CLI session

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/common/src/test/org/apache/hadoop/hive/conf/TestHiveAsyncLogging.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/conf/TestHiveAsyncLogging.java b/common/src/test/org/apache/hadoop/hive/conf/TestHiveAsyncLogging.java
new file mode 100644
index 0000000..e2631cf
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/conf/TestHiveAsyncLogging.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.conf;
+
+import org.apache.hadoop.hive.common.LogUtils;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.async.AsyncLoggerContextSelector;
+import org.apache.logging.log4j.core.impl.Log4jContextFactory;
+import org.apache.logging.log4j.core.selector.ClassLoaderContextSelector;
+import org.apache.logging.log4j.core.selector.ContextSelector;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestHiveAsyncLogging extends TestCase {
+
+  // this test requires disruptor jar in classpath
+  @Test
+  public void testAsyncLoggingInitialization() throws Exception {
+    HiveConf conf = new HiveConf();
+    conf.setBoolVar(ConfVars.HIVE_ASYNC_LOG_ENABLED, false);
+    LogUtils.initHiveLog4jCommon(conf, ConfVars.HIVE_LOG4J_FILE);
+    Log4jContextFactory log4jContextFactory = (Log4jContextFactory) LogManager.getFactory();
+    ContextSelector contextSelector = log4jContextFactory.getSelector();
+    assertTrue(contextSelector instanceof ClassLoaderContextSelector);
+
+    conf.setBoolVar(ConfVars.HIVE_ASYNC_LOG_ENABLED, true);
+    LogUtils.initHiveLog4jCommon(conf, ConfVars.HIVE_LOG4J_FILE);
+    log4jContextFactory = (Log4jContextFactory) LogManager.getFactory();
+    contextSelector = log4jContextFactory.getSelector();
+    assertTrue(contextSelector instanceof AsyncLoggerContextSelector);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/data/conf/hive-log4j2.properties
----------------------------------------------------------------------
diff --git a/data/conf/hive-log4j2.properties b/data/conf/hive-log4j2.properties
index 6bace1f..f60d5be 100644
--- a/data/conf/hive-log4j2.properties
+++ b/data/conf/hive-log4j2.properties
@@ -35,7 +35,7 @@ appender.console.layout.type = PatternLayout
 appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n
 
 # daily rolling file appender
-appender.DRFA.type = RollingFile
+appender.DRFA.type = RollingRandomAccessFile
 appender.DRFA.name = DRFA
 appender.DRFA.fileName = ${sys:hive.log.dir}/${sys:hive.log.file}
 appender.DRFA.filePattern = ${sys:hive.log.dir}/${sys:hive.log.file}.%d{yyyy-MM-dd}

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/hcatalog/core/pom.xml
----------------------------------------------------------------------
diff --git a/hcatalog/core/pom.xml b/hcatalog/core/pom.xml
index 1e970bf..c9a6c01 100644
--- a/hcatalog/core/pom.xml
+++ b/hcatalog/core/pom.xml
@@ -131,6 +131,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>com.lmax</groupId>
+      <artifactId>disruptor</artifactId>
+      <version>${disruptor.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/llap-server/bin/llapDaemon.sh
----------------------------------------------------------------------
diff --git a/llap-server/bin/llapDaemon.sh b/llap-server/bin/llapDaemon.sh
index 6f57998..566bbc8 100755
--- a/llap-server/bin/llapDaemon.sh
+++ b/llap-server/bin/llapDaemon.sh
@@ -113,7 +113,7 @@ case $startStop in
     #rotate_log $logOut
     echo starting llapdaemon, logging to $logLog and $logOut
     export LLAP_DAEMON_LOGFILE=${LLAP_DAEMON_LOG_BASE}.log
-    nohup nice -n $LLAP_DAEMON_NICENESS "$LLAP_DAEMON_BIN_HOME"/runLlapDaemon.sh run  > "$logOut" 2>&1 < /dev/null &
+    nohup nice -n $LLAP_DAEMON_NICENESS "$LLAP_DAEMON_BIN_HOME"/runLlapDaemon.sh run  >> "$logOut" 2>&1 < /dev/null &
     echo $! > $pid
     ;;
           

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index e662de9..2e07a8c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -31,6 +31,7 @@ import java.util.regex.Pattern;
 import javax.management.ObjectName;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.DaemonId;
@@ -119,7 +120,8 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
       int mngPort, int shufflePort, int webPort, String appName) {
     super("LlapDaemon");
 
-    initializeLogging();
+    initializeLogging(daemonConf);
+
     printAsciiArt();
 
     Preconditions.checkArgument(numExecutors > 0);
@@ -264,13 +266,15 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     addIfService(amReporter);
   }
 
-  private void initializeLogging() {
+  private void initializeLogging(final Configuration conf) {
     long start = System.currentTimeMillis();
     URL llap_l4j2 = LlapDaemon.class.getClassLoader().getResource(LOG4j2_PROPERTIES_FILE);
     if (llap_l4j2 != null) {
+      final boolean async = LogUtils.checkAndSetAsyncLogging(conf);
       Configurator.initialize("LlapDaemonLog4j2", llap_l4j2.toString());
       long end = System.currentTimeMillis();
-      LOG.warn("LLAP daemon logging initialized from {} in {} ms", llap_l4j2, (end - start));
+      LOG.warn("LLAP daemon logging initialized from {} in {} ms. Async: {}",
+          llap_l4j2, (end - start), async);
     } else {
       throw new RuntimeException("Log initialization failed." +
           " Unable to locate " + LOG4j2_PROPERTIES_FILE + " file in classpath");

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/llap-server/src/main/resources/llap-cli-log4j2.properties
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/llap-cli-log4j2.properties b/llap-server/src/main/resources/llap-cli-log4j2.properties
index 2f27b5e..c6b8f20 100644
--- a/llap-server/src/main/resources/llap-cli-log4j2.properties
+++ b/llap-server/src/main/resources/llap-cli-log4j2.properties
@@ -36,7 +36,7 @@ appender.console.layout.type = PatternLayout
 appender.console.layout.pattern = %p %c{2}: %m%n
 
 # daily rolling file appender
-appender.DRFA.type = RollingFile
+appender.DRFA.type = RollingRandomAccessFile
 appender.DRFA.name = DRFA
 appender.DRFA.fileName = ${sys:hive.log.dir}/${sys:hive.log.file}
 # Use %pid in the filePattern to append <process-id>@<host-name> to the filename if you want separate log files for different CLI session

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/llap-server/src/main/resources/llap-daemon-log4j2.properties
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/llap-daemon-log4j2.properties b/llap-server/src/main/resources/llap-daemon-log4j2.properties
index 268eb59..c5166e3 100644
--- a/llap-server/src/main/resources/llap-daemon-log4j2.properties
+++ b/llap-server/src/main/resources/llap-daemon-log4j2.properties
@@ -38,7 +38,7 @@ appender.console.layout.type = PatternLayout
 appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t%x] %p %c{2} : %m%n
 
 # rolling file appender
-appender.RFA.type = RollingFile
+appender.RFA.type = RollingRandomAccessFile
 appender.RFA.name = RFA
 appender.RFA.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}
 appender.RFA.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%i
@@ -51,7 +51,7 @@ appender.RFA.strategy.type = DefaultRolloverStrategy
 appender.RFA.strategy.max = ${sys:llap.daemon.log.maxbackupindex}
 
 # history file appender
-appender.HISTORYAPPENDER.type = RollingFile
+appender.HISTORYAPPENDER.type = RollingRandomAccessFile
 appender.HISTORYAPPENDER.name = HISTORYAPPENDER
 appender.HISTORYAPPENDER.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.historylog.file}
 appender.HISTORYAPPENDER.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.historylog.file}_%i

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/llap-server/src/test/resources/llap-daemon-log4j2.properties
----------------------------------------------------------------------
diff --git a/llap-server/src/test/resources/llap-daemon-log4j2.properties b/llap-server/src/test/resources/llap-daemon-log4j2.properties
index 7b5f4ed..2714dbd 100644
--- a/llap-server/src/test/resources/llap-daemon-log4j2.properties
+++ b/llap-server/src/test/resources/llap-daemon-log4j2.properties
@@ -38,7 +38,7 @@ appender.console.layout.type = PatternLayout
 appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t%x] %p %c{2} : %m%n
 
 # rolling file appender
-appender.RFA.type = RollingFile
+appender.RFA.type = RollingRandomAccessFile
 appender.RFA.name = RFA
 appender.RFA.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}
 appender.RFA.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.log.file}_%i
@@ -51,7 +51,7 @@ appender.RFA.strategy.type = DefaultRolloverStrategy
 appender.RFA.strategy.max = ${sys:llap.daemon.log.maxbackupindex}
 
 # history file appender
-appender.HISTORYAPPENDER.type = RollingFile
+appender.HISTORYAPPENDER.type = RollingRandomAccessFile
 appender.HISTORYAPPENDER.name = HISTORYAPPENDER
 appender.HISTORYAPPENDER.fileName = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.historylog.file}
 appender.HISTORYAPPENDER.filePattern = ${sys:llap.daemon.log.dir}/${sys:llap.daemon.historylog.file}_%i

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/metastore/pom.xml
----------------------------------------------------------------------
diff --git a/metastore/pom.xml b/metastore/pom.xml
index 8816829..3827a51 100644
--- a/metastore/pom.xml
+++ b/metastore/pom.xml
@@ -228,6 +228,12 @@
       <version>${mockito-all.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.lmax</groupId>
+      <artifactId>disruptor</artifactId>
+      <version>${disruptor.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dff2a72..bde6857 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,8 @@
     <hadoop.version>2.6.0</hadoop.version>
     <hadoop.bin.path>${basedir}/${hive.path.to.root}/testutils/hadoop</hadoop.bin.path>
     <hbase.version>1.1.1</hbase.version>
+    <!-- required for logging test to avoid including hbase which pulls disruptor transitively -->
+    <disruptor.version>3.3.0</disruptor.version>
     <!-- httpcomponents are not always in version sync -->
     <httpcomponents.client.version>4.4</httpcomponents.client.version>
     <httpcomponents.core.version>4.4</httpcomponents.core.version>

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/ql/pom.xml
----------------------------------------------------------------------
diff --git a/ql/pom.xml b/ql/pom.xml
index aaa3271..8b2d0e6 100644
--- a/ql/pom.xml
+++ b/ql/pom.xml
@@ -325,6 +325,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>com.lmax</groupId>
+      <artifactId>disruptor</artifactId>
+      <version>${disruptor.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.codehaus.groovy</groupId>
       <artifactId>groovy-all</artifactId>
       <version>${groovy.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/ql/src/main/resources/hive-exec-log4j2.properties
----------------------------------------------------------------------
diff --git a/ql/src/main/resources/hive-exec-log4j2.properties b/ql/src/main/resources/hive-exec-log4j2.properties
index 4fba04c..21e24fd 100644
--- a/ql/src/main/resources/hive-exec-log4j2.properties
+++ b/ql/src/main/resources/hive-exec-log4j2.properties
@@ -36,7 +36,7 @@ appender.console.layout.type = PatternLayout
 appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} [%t]: %p %c{2}: %m%n
 
 # simple file appender
-appender.FA.type = File
+appender.FA.type = RandomAccessFile
 appender.FA.name = FA
 appender.FA.fileName = ${sys:hive.log.dir}/${sys:hive.log.file}
 appender.FA.layout.type = PatternLayout

http://git-wip-us.apache.org/repos/asf/hive/blob/b870d526/ql/src/main/resources/tez-container-log4j2.properties
----------------------------------------------------------------------
diff --git a/ql/src/main/resources/tez-container-log4j2.properties b/ql/src/main/resources/tez-container-log4j2.properties
index 5d2b138..a048b17 100644
--- a/ql/src/main/resources/tez-container-log4j2.properties
+++ b/ql/src/main/resources/tez-container-log4j2.properties
@@ -28,7 +28,7 @@ property.tez.container.log.file = syslog
 appenders = CLA
 
 # daily rolling file appender
-appender.CLA.type = RollingFile
+appender.CLA.type = RollingRandomAccessFile
 appender.CLA.name = CLA
 appender.CLA.fileName = ${sys:tez.container.log.dir}/${sys:tez.container.log.file}
 appender.CLA.filePattern = ${sys:tez.container.log.dir}/${sys:tez.container.log.file}.%d{yyyy-MM-dd}


[03/13] hive git commit: HIVE-13395 Lost Update problem in ACID (Eugene Koifman, reviewed by Alan Gates)

Posted by jd...@apache.org.
HIVE-13395 Lost Update problem in ACID (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/10d05491
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/10d05491
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/10d05491

Branch: refs/heads/llap
Commit: 10d05491379bb6f8e607a030811e8d4e530604de
Parents: 0927187
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu May 5 12:45:44 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu May 5 12:45:44 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../hive/metastore/TestHiveMetaStoreTxns.java   |   2 +-
 .../upgrade/derby/035-HIVE-13395.derby.sql      |  11 +
 .../upgrade/derby/hive-schema-2.1.0.derby.sql   |   2 +-
 .../derby/hive-txn-schema-1.3.0.derby.sql       |  11 +-
 .../derby/hive-txn-schema-2.1.0.derby.sql       | 130 ++++
 .../derby/upgrade-1.2.0-to-1.3.0.derby.sql      |   1 +
 .../derby/upgrade-2.0.0-to-2.1.0.derby.sql      |   1 +
 .../upgrade/mssql/020-HIVE-13395.mssql.sql      |   9 +
 .../upgrade/mssql/hive-schema-1.3.0.mssql.sql   |  12 +-
 .../upgrade/mssql/hive-schema-2.1.0.mssql.sql   |  12 +-
 .../mssql/upgrade-1.2.0-to-1.3.0.mssql.sql      |   1 +
 .../mssql/upgrade-2.0.0-to-2.1.0.mssql.sql      |   1 +
 .../upgrade/mysql/035-HIVE-13395.mysql.sql      |  10 +
 .../upgrade/mysql/hive-schema-2.1.0.mysql.sql   |   2 +-
 .../mysql/hive-txn-schema-2.1.0.mysql.sql       | 131 ++++
 .../mysql/upgrade-1.2.0-to-1.3.0.mysql.sql      |   1 +
 .../mysql/upgrade-2.0.0-to-2.1.0.mysql.sql      |   1 +
 .../upgrade/oracle/035-HIVE-13395.oracle.sql    |  10 +
 .../upgrade/oracle/hive-schema-2.1.0.oracle.sql |   2 +-
 .../oracle/hive-txn-schema-1.3.0.oracle.sql     |  12 +-
 .../oracle/hive-txn-schema-2.1.0.oracle.sql     | 129 ++++
 .../oracle/upgrade-1.2.0-to-1.3.0.oracle.sql    |   1 +
 .../oracle/upgrade-2.0.0-to-2.1.0.oracle.sql    |   1 +
 .../postgres/034-HIVE-13395.postgres.sql        |  10 +
 .../postgres/hive-schema-2.1.0.postgres.sql     |   2 +-
 .../postgres/hive-txn-schema-1.3.0.postgres.sql |  11 +-
 .../postgres/hive-txn-schema-2.1.0.postgres.sql | 129 ++++
 .../upgrade-1.2.0-to-1.3.0.postgres.sql         |   1 +
 .../upgrade-2.0.0-to-2.1.0.postgres.sql         |   1 +
 .../hadoop/hive/metastore/HiveMetaStore.java    |   1 +
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    | 130 ++--
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 466 +++++++++++---
 .../hadoop/hive/metastore/txn/TxnStore.java     |   8 +-
 .../hadoop/hive/metastore/txn/TxnUtils.java     |   2 +
 .../metastore/txn/TestCompactionTxnHandler.java |   6 +-
 .../hive/metastore/txn/TestTxnHandler.java      |  29 +-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |   2 +-
 .../hadoop/hive/ql/lockmgr/DbLockManager.java   |   5 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |  27 +-
 .../hadoop/hive/ql/txn/AcidWriteSetService.java |  61 ++
 .../txn/compactor/HouseKeeperServiceBase.java   |   2 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java |   2 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java    |   2 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java |   2 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java |  20 +
 .../hive/ql/lockmgr/TestDbTxnManager.java       |   7 +
 .../hive/ql/lockmgr/TestDbTxnManager2.java      | 610 ++++++++++++++++++-
 .../hive/ql/txn/compactor/TestCleaner.java      |   4 +
 49 files changed, 1843 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 06a6906..bb74d99 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1769,6 +1769,8 @@ public class HiveConf extends Configuration {
       new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"),
     HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s",
       new TimeValidator(TimeUnit.MILLISECONDS), "Time interval describing how often the reaper runs"),
+    WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s",
+      new TimeValidator(TimeUnit.MILLISECONDS), "Frequency of WriteSet reaper runs"),
 
     // For HBase storage handler
     HIVE_HBASE_WAL_ENABLED("hive.hbase.wal.enabled", true,

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
index e9ce789..22354ab 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
@@ -187,7 +187,7 @@ public class TestHiveMetaStoreTxns {
         .setDbName("mydb")
         .setTableName("mytable")
         .setPartitionName("mypartition")
-        .setExclusive()
+        .setSemiShared()
         .build())
       .addLockComponent(new LockComponentBuilder()
         .setDbName("mydb")

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql b/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql
new file mode 100644
index 0000000..df33b95
--- /dev/null
+++ b/metastore/scripts/upgrade/derby/035-HIVE-13395.derby.sql
@@ -0,0 +1,11 @@
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar(128) NOT NULL,
+  WS_TABLE varchar(128) NOT NULL,
+  WS_PARTITION varchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);
+
+

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql b/metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql
index 1d00499..dc27afc 100644
--- a/metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-schema-2.1.0.derby.sql
@@ -338,7 +338,7 @@ ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED
 -- ----------------------------
 -- Transaction and Lock Tables
 -- ----------------------------
-RUN 'hive-txn-schema-2.0.0.derby.sql';
+RUN 'hive-txn-schema-2.1.0.derby.sql';
 
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql b/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
index 13f3340..480c19e 100644
--- a/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/hive-txn-schema-1.3.0.derby.sql
@@ -32,7 +32,8 @@ CREATE TABLE TXN_COMPONENTS (
   TC_TXNID bigint REFERENCES TXNS (TXN_ID),
   TC_DATABASE varchar(128) NOT NULL,
   TC_TABLE varchar(128),
-  TC_PARTITION varchar(767)
+  TC_PARTITION varchar(767),
+  TC_OPERATION_TYPE char(1) NOT NULL
 );
 
 CREATE TABLE COMPLETED_TXN_COMPONENTS (
@@ -117,3 +118,11 @@ CREATE TABLE AUX_TABLE (
   PRIMARY KEY(MT_KEY1, MT_KEY2)
 );
 
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar(128) NOT NULL,
+  WS_TABLE varchar(128) NOT NULL,
+  WS_PARTITION varchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/hive-txn-schema-2.1.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/hive-txn-schema-2.1.0.derby.sql b/metastore/scripts/upgrade/derby/hive-txn-schema-2.1.0.derby.sql
new file mode 100644
index 0000000..11d86ca
--- /dev/null
+++ b/metastore/scripts/upgrade/derby/hive-txn-schema-2.1.0.derby.sql
@@ -0,0 +1,130 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the License); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an AS IS BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+--
+-- Tables for transaction management
+-- 
+CREATE TABLE TXNS (
+  TXN_ID bigint PRIMARY KEY,
+  TXN_STATE char(1) NOT NULL,
+  TXN_STARTED bigint NOT NULL,
+  TXN_LAST_HEARTBEAT bigint NOT NULL,
+  TXN_USER varchar(128) NOT NULL,
+  TXN_HOST varchar(128) NOT NULL,
+  TXN_AGENT_INFO varchar(128),
+  TXN_META_INFO varchar(128),
+  TXN_HEARTBEAT_COUNT integer
+);
+
+CREATE TABLE TXN_COMPONENTS (
+  TC_TXNID bigint REFERENCES TXNS (TXN_ID),
+  TC_DATABASE varchar(128) NOT NULL,
+  TC_TABLE varchar(128),
+  TC_PARTITION varchar(767),
+  TC_OPERATION_TYPE char(1) NOT NULL
+);
+
+CREATE TABLE COMPLETED_TXN_COMPONENTS (
+  CTC_TXNID bigint,
+  CTC_DATABASE varchar(128) NOT NULL,
+  CTC_TABLE varchar(128),
+  CTC_PARTITION varchar(767)
+);
+
+CREATE TABLE NEXT_TXN_ID (
+  NTXN_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_TXN_ID VALUES(1);
+
+CREATE TABLE HIVE_LOCKS (
+  HL_LOCK_EXT_ID bigint NOT NULL,
+  HL_LOCK_INT_ID bigint NOT NULL,
+  HL_TXNID bigint,
+  HL_DB varchar(128) NOT NULL,
+  HL_TABLE varchar(128),
+  HL_PARTITION varchar(767),
+  HL_LOCK_STATE char(1) NOT NULL,
+  HL_LOCK_TYPE char(1) NOT NULL,
+  HL_LAST_HEARTBEAT bigint NOT NULL,
+  HL_ACQUIRED_AT bigint,
+  HL_USER varchar(128) NOT NULL,
+  HL_HOST varchar(128) NOT NULL,
+  HL_HEARTBEAT_COUNT integer,
+  HL_AGENT_INFO varchar(128),
+  HL_BLOCKEDBY_EXT_ID bigint,
+  HL_BLOCKEDBY_INT_ID bigint,
+  PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID)
+); 
+
+CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+  NL_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE COMPACTION_QUEUE (
+  CQ_ID bigint PRIMARY KEY,
+  CQ_DATABASE varchar(128) NOT NULL,
+  CQ_TABLE varchar(128) NOT NULL,
+  CQ_PARTITION varchar(767),
+  CQ_STATE char(1) NOT NULL,
+  CQ_TYPE char(1) NOT NULL,
+  CQ_WORKER_ID varchar(128),
+  CQ_START bigint,
+  CQ_RUN_AS varchar(128),
+  CQ_HIGHEST_TXN_ID bigint,
+  CQ_META_INFO varchar(2048) for bit data,
+  CQ_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
+  NCQ_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);
+
+CREATE TABLE COMPLETED_COMPACTIONS (
+  CC_ID bigint PRIMARY KEY,
+  CC_DATABASE varchar(128) NOT NULL,
+  CC_TABLE varchar(128) NOT NULL,
+  CC_PARTITION varchar(767),
+  CC_STATE char(1) NOT NULL,
+  CC_TYPE char(1) NOT NULL,
+  CC_WORKER_ID varchar(128),
+  CC_START bigint,
+  CC_END bigint,
+  CC_RUN_AS varchar(128),
+  CC_HIGHEST_TXN_ID bigint,
+  CC_META_INFO varchar(2048) for bit data,
+  CC_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE AUX_TABLE (
+  MT_KEY1 varchar(128) NOT NULL,
+  MT_KEY2 bigint NOT NULL,
+  MT_COMMENT varchar(255),
+  PRIMARY KEY(MT_KEY1, MT_KEY2)
+);
+
+--1st 4 cols make up a PK but since WS_PARTITION is nullable we can't declare such PK
+--This is a good candidate for Index orgainzed table
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar(128) NOT NULL,
+  WS_TABLE varchar(128) NOT NULL,
+  WS_PARTITION varchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
index 74ecac2..6b90b73 100644
--- a/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/upgrade-1.2.0-to-1.3.0.derby.sql
@@ -10,5 +10,6 @@ RUN '029-HIVE-12822.derby.sql';
 RUN '030-HIVE-12823.derby.sql';
 RUN '031-HIVE-12831.derby.sql';
 RUN '032-HIVE-12832.derby.sql';
+RUN '035-HIVE-13395.derby.sql';
 
 UPDATE "APP".VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql
index dde8c45..94c686b 100644
--- a/metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql
+++ b/metastore/scripts/upgrade/derby/upgrade-2.0.0-to-2.1.0.derby.sql
@@ -1,5 +1,6 @@
 -- Upgrade MetaStore schema from 2.0.0 to 2.1.0
 RUN '033-HIVE-12892.derby.sql';
 RUN '034-HIVE-13076.derby.sql';
+RUN '035-HIVE-13395.derby.sql';
 
 UPDATE "APP".VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1;

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql b/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql
new file mode 100644
index 0000000..281014c
--- /dev/null
+++ b/metastore/scripts/upgrade/mssql/020-HIVE-13395.mssql.sql
@@ -0,0 +1,9 @@
+CREATE TABLE WRITE_SET (
+  WS_DATABASE nvarchar(128) NOT NULL,
+  WS_TABLE nvarchar(128) NOT NULL,
+  WS_PARTITION nvarchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1) NULL;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
index 57d2343..a184f24 100644
--- a/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/hive-schema-1.3.0.mssql.sql
@@ -964,7 +964,8 @@ CREATE TABLE TXN_COMPONENTS(
 	TC_TXNID bigint NULL,
 	TC_DATABASE nvarchar(128) NOT NULL,
 	TC_TABLE nvarchar(128) NULL,
-	TC_PARTITION nvarchar(767) NULL
+	TC_PARTITION nvarchar(767) NULL,
+	TC_OPERATION_TYPE char(1) NOT NULL
 );
 
 ALTER TABLE TXN_COMPONENTS  WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID);
@@ -980,6 +981,15 @@ CREATE TABLE AUX_TABLE (
 )
 );
 
+CREATE TABLE WRITE_SET (
+  WS_DATABASE nvarchar(128) NOT NULL,
+  WS_TABLE nvarchar(128) NOT NULL,
+  WS_PARTITION nvarchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);
+
 
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql b/metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql
index 2d9cf76..d9194ff 100644
--- a/metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/hive-schema-2.1.0.mssql.sql
@@ -977,7 +977,8 @@ CREATE TABLE TXN_COMPONENTS(
 	TC_TXNID bigint NULL,
 	TC_DATABASE nvarchar(128) NOT NULL,
 	TC_TABLE nvarchar(128) NULL,
-	TC_PARTITION nvarchar(767) NULL
+	TC_PARTITION nvarchar(767) NULL,
+	TC_OPERATION_TYPE char(1) NOT NULL
 );
 
 ALTER TABLE TXN_COMPONENTS  WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID);
@@ -1011,6 +1012,15 @@ ALTER TABLE KEY_CONSTRAINTS ADD CONSTRAINT CONSTRAINTS_PK PRIMARY KEY (CONSTRAIN
 
 CREATE INDEX CONSTRAINTS_PARENT_TBL_ID__INDEX ON KEY_CONSTRAINTS(PARENT_TBL_ID);
 
+CREATE TABLE WRITE_SET (
+  WS_DATABASE nvarchar(128) NOT NULL,
+  WS_TABLE nvarchar(128) NOT NULL,
+  WS_PARTITION nvarchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);
+
 
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql b/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
index b0f28bb..251e621 100644
--- a/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/upgrade-1.2.0-to-1.3.0.mssql.sql
@@ -11,6 +11,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0' AS MESSAGE;
 :r 015-HIVE-12823.mssql.sql;
 :r 016-HIVE-12831.mssql.sql;
 :r 017-HIVE-12832.mssql.sql;
+:r 020-HIVE-13395.mssql.sql;
 
 UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS MESSAGE;

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql b/metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql
index 3e5cb30..c796126 100644
--- a/metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql
+++ b/metastore/scripts/upgrade/mssql/upgrade-2.0.0-to-2.1.0.mssql.sql
@@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0' AS MESSAGE;
 
 :r 018-HIVE-12892.mssql.sql;
 :r 019-HIVE-13076.mssql.sql;
+:r 020-HIVE-13395.mssql.sql;
 
 UPDATE VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0' AS MESSAGE;

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql b/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql
new file mode 100644
index 0000000..586caef
--- /dev/null
+++ b/metastore/scripts/upgrade/mysql/035-HIVE-13395.mysql.sql
@@ -0,0 +1,10 @@
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar(128) NOT NULL,
+  WS_TABLE varchar(128) NOT NULL,
+  WS_PARTITION varchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql
index 466e950..a6b783c 100644
--- a/metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/hive-schema-2.1.0.mysql.sql
@@ -839,7 +839,7 @@ CREATE INDEX `CONSTRAINTS_PARENT_TABLE_ID_INDEX` ON KEY_CONSTRAINTS (`PARENT_TBL
 -- ----------------------------
 -- Transaction and Lock Tables
 -- ----------------------------
-SOURCE hive-txn-schema-2.0.0.mysql.sql;
+SOURCE hive-txn-schema-2.1.0.mysql.sql;
 
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mysql/hive-txn-schema-2.1.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-txn-schema-2.1.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-txn-schema-2.1.0.mysql.sql
new file mode 100644
index 0000000..369d6bb
--- /dev/null
+++ b/metastore/scripts/upgrade/mysql/hive-txn-schema-2.1.0.mysql.sql
@@ -0,0 +1,131 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+--
+-- Tables for transaction management
+-- 
+
+CREATE TABLE TXNS (
+  TXN_ID bigint PRIMARY KEY,
+  TXN_STATE char(1) NOT NULL,
+  TXN_STARTED bigint NOT NULL,
+  TXN_LAST_HEARTBEAT bigint NOT NULL,
+  TXN_USER varchar(128) NOT NULL,
+  TXN_HOST varchar(128) NOT NULL,
+  TXN_AGENT_INFO varchar(128),
+  TXN_META_INFO varchar(128),
+  TXN_HEARTBEAT_COUNT int
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE TXN_COMPONENTS (
+  TC_TXNID bigint NOT NULL,
+  TC_DATABASE varchar(128) NOT NULL,
+  TC_TABLE varchar(128) NOT NULL,
+  TC_PARTITION varchar(767),
+  TC_OPERATION_TYPE char(1) NOT NULL,
+  FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE COMPLETED_TXN_COMPONENTS (
+  CTC_TXNID bigint NOT NULL,
+  CTC_DATABASE varchar(128) NOT NULL,
+  CTC_TABLE varchar(128),
+  CTC_PARTITION varchar(767)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE NEXT_TXN_ID (
+  NTXN_NEXT bigint NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+INSERT INTO NEXT_TXN_ID VALUES(1);
+
+CREATE TABLE HIVE_LOCKS (
+  HL_LOCK_EXT_ID bigint NOT NULL,
+  HL_LOCK_INT_ID bigint NOT NULL,
+  HL_TXNID bigint,
+  HL_DB varchar(128) NOT NULL,
+  HL_TABLE varchar(128),
+  HL_PARTITION varchar(767),
+  HL_LOCK_STATE char(1) not null,
+  HL_LOCK_TYPE char(1) not null,
+  HL_LAST_HEARTBEAT bigint NOT NULL,
+  HL_ACQUIRED_AT bigint,
+  HL_USER varchar(128) NOT NULL,
+  HL_HOST varchar(128) NOT NULL,
+  HL_HEARTBEAT_COUNT int,
+  HL_AGENT_INFO varchar(128),
+  HL_BLOCKEDBY_EXT_ID bigint,
+  HL_BLOCKEDBY_INT_ID bigint,
+  PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID),
+  KEY HIVE_LOCK_TXNID_INDEX (HL_TXNID)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE INDEX HL_TXNID_IDX ON HIVE_LOCKS (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+  NL_NEXT bigint NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE COMPACTION_QUEUE (
+  CQ_ID bigint PRIMARY KEY,
+  CQ_DATABASE varchar(128) NOT NULL,
+  CQ_TABLE varchar(128) NOT NULL,
+  CQ_PARTITION varchar(767),
+  CQ_STATE char(1) NOT NULL,
+  CQ_TYPE char(1) NOT NULL,
+  CQ_WORKER_ID varchar(128),
+  CQ_START bigint,
+  CQ_RUN_AS varchar(128),
+  CQ_HIGHEST_TXN_ID bigint,
+  CQ_META_INFO varbinary(2048),
+  CQ_HADOOP_JOB_ID varchar(32)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE COMPLETED_COMPACTIONS (
+  CC_ID bigint PRIMARY KEY,
+  CC_DATABASE varchar(128) NOT NULL,
+  CC_TABLE varchar(128) NOT NULL,
+  CC_PARTITION varchar(767),
+  CC_STATE char(1) NOT NULL,
+  CC_TYPE char(1) NOT NULL,
+  CC_WORKER_ID varchar(128),
+  CC_START bigint,
+  CC_END bigint,
+  CC_RUN_AS varchar(128),
+  CC_HIGHEST_TXN_ID bigint,
+  CC_META_INFO varbinary(2048),
+  CC_HADOOP_JOB_ID varchar(32)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
+  NCQ_NEXT bigint NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);
+
+CREATE TABLE AUX_TABLE (
+  MT_KEY1 varchar(128) NOT NULL,
+  MT_KEY2 bigint NOT NULL,
+  MT_COMMENT varchar(255),
+  PRIMARY KEY(MT_KEY1, MT_KEY2)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar(128) NOT NULL,
+  WS_TABLE varchar(128) NOT NULL,
+  WS_PARTITION varchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql b/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
index 477c10b..b65aee5 100644
--- a/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/upgrade-1.2.0-to-1.3.0.mysql.sql
@@ -11,6 +11,7 @@ SOURCE 029-HIVE-12822.mysql.sql;
 SOURCE 030-HIVE-12823.mysql.sql;
 SOURCE 031-HIVE-12831.mysql.sql;
 SOURCE 032-HIVE-12832.mysql.sql;
+SOURCE 035-HIVE-13395.mysql.sql;
 
 UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS ' ';

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql b/metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql
index eb21f73..c3f83b3 100644
--- a/metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql
@@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0' AS ' ';
 
 SOURCE 033-HIVE-12892.mysql.sql;
 SOURCE 034-HIVE-13076.mysql.sql;
+SOURCE 035-HIVE-12295.mysql.sql;
 
 UPDATE VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0' AS ' ';

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql b/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql
new file mode 100644
index 0000000..ad1bbd9
--- /dev/null
+++ b/metastore/scripts/upgrade/oracle/035-HIVE-13395.oracle.sql
@@ -0,0 +1,10 @@
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar2(128) NOT NULL,
+  WS_TABLE varchar2(128) NOT NULL,
+  WS_PARTITION varchar2(767),
+  WS_TXNID number(19) NOT NULL,
+  WS_COMMIT_ID number(19) NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);
+
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql
index f57e588..d003a16 100644
--- a/metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/hive-schema-2.1.0.oracle.sql
@@ -808,7 +808,7 @@ CREATE INDEX CONSTRAINTS_PARENT_TBL_ID_INDEX ON KEY_CONSTRAINTS(PARENT_TBL_ID);
 ------------------------------
 -- Transaction and lock tables
 ------------------------------
-@hive-txn-schema-2.0.0.oracle.sql;
+@hive-txn-schema-2.1.0.oracle.sql;
 
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
index 788741a..199ff4c 100644
--- a/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/hive-txn-schema-1.3.0.oracle.sql
@@ -33,7 +33,8 @@ CREATE TABLE TXN_COMPONENTS (
   TC_TXNID NUMBER(19) REFERENCES TXNS (TXN_ID),
   TC_DATABASE VARCHAR2(128) NOT NULL,
   TC_TABLE VARCHAR2(128),
-  TC_PARTITION VARCHAR2(767) NULL
+  TC_PARTITION VARCHAR2(767) NULL,
+  TC_OPERATION_TYPE char(1) NOT NULL
 ) ROWDEPENDENCIES;
 
 CREATE TABLE COMPLETED_TXN_COMPONENTS (
@@ -118,3 +119,12 @@ CREATE TABLE AUX_TABLE (
   PRIMARY KEY(MT_KEY1, MT_KEY2)
 );
 
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar2(128) NOT NULL,
+  WS_TABLE varchar2(128) NOT NULL,
+  WS_PARTITION varchar2(767),
+  WS_TXNID number(19) NOT NULL,
+  WS_COMMIT_ID number(19) NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);
+

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/hive-txn-schema-2.1.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/hive-txn-schema-2.1.0.oracle.sql b/metastore/scripts/upgrade/oracle/hive-txn-schema-2.1.0.oracle.sql
new file mode 100644
index 0000000..d39baab
--- /dev/null
+++ b/metastore/scripts/upgrade/oracle/hive-txn-schema-2.1.0.oracle.sql
@@ -0,0 +1,129 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the License); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an AS IS BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+--
+-- Tables for transaction management
+-- 
+
+CREATE TABLE TXNS (
+  TXN_ID NUMBER(19) PRIMARY KEY,
+  TXN_STATE char(1) NOT NULL,
+  TXN_STARTED NUMBER(19) NOT NULL,
+  TXN_LAST_HEARTBEAT NUMBER(19) NOT NULL,
+  TXN_USER varchar(128) NOT NULL,
+  TXN_HOST varchar(128) NOT NULL,
+  TXN_AGENT_INFO varchar2(128),
+  TXN_META_INFO varchar2(128),
+  TXN_HEARTBEAT_COUNT number(10)
+) ROWDEPENDENCIES;
+
+CREATE TABLE TXN_COMPONENTS (
+  TC_TXNID NUMBER(19) REFERENCES TXNS (TXN_ID),
+  TC_DATABASE VARCHAR2(128) NOT NULL,
+  TC_TABLE VARCHAR2(128),
+  TC_PARTITION VARCHAR2(767) NULL,
+  TC_OPERATION_TYPE char(1) NOT NULL
+) ROWDEPENDENCIES;
+
+CREATE TABLE COMPLETED_TXN_COMPONENTS (
+  CTC_TXNID NUMBER(19),
+  CTC_DATABASE varchar(128) NOT NULL,
+  CTC_TABLE varchar(128),
+  CTC_PARTITION varchar(767)
+) ROWDEPENDENCIES;
+
+CREATE TABLE NEXT_TXN_ID (
+  NTXN_NEXT NUMBER(19) NOT NULL
+);
+INSERT INTO NEXT_TXN_ID VALUES(1);
+
+CREATE TABLE HIVE_LOCKS (
+  HL_LOCK_EXT_ID NUMBER(19) NOT NULL,
+  HL_LOCK_INT_ID NUMBER(19) NOT NULL,
+  HL_TXNID NUMBER(19),
+  HL_DB VARCHAR2(128) NOT NULL,
+  HL_TABLE VARCHAR2(128),
+  HL_PARTITION VARCHAR2(767),
+  HL_LOCK_STATE CHAR(1) NOT NULL,
+  HL_LOCK_TYPE CHAR(1) NOT NULL,
+  HL_LAST_HEARTBEAT NUMBER(19) NOT NULL,
+  HL_ACQUIRED_AT NUMBER(19),
+  HL_USER varchar(128) NOT NULL,
+  HL_HOST varchar(128) NOT NULL,
+  HL_HEARTBEAT_COUNT number(10),
+  HL_AGENT_INFO varchar2(128),
+  HL_BLOCKEDBY_EXT_ID number(19),
+  HL_BLOCKEDBY_INT_ID number(19),
+  PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID)
+) ROWDEPENDENCIES;
+
+CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+  NL_NEXT NUMBER(19) NOT NULL
+);
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE COMPACTION_QUEUE (
+  CQ_ID NUMBER(19) PRIMARY KEY,
+  CQ_DATABASE varchar(128) NOT NULL,
+  CQ_TABLE varchar(128) NOT NULL,
+  CQ_PARTITION varchar(767),
+  CQ_STATE char(1) NOT NULL,
+  CQ_TYPE char(1) NOT NULL,
+  CQ_WORKER_ID varchar(128),
+  CQ_START NUMBER(19),
+  CQ_RUN_AS varchar(128),
+  CQ_HIGHEST_TXN_ID NUMBER(19),
+  CQ_META_INFO BLOB,
+  CQ_HADOOP_JOB_ID varchar2(32)
+) ROWDEPENDENCIES;
+
+CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
+  NCQ_NEXT NUMBER(19) NOT NULL
+);
+INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);
+
+CREATE TABLE COMPLETED_COMPACTIONS (
+  CC_ID NUMBER(19) PRIMARY KEY,
+  CC_DATABASE varchar(128) NOT NULL,
+  CC_TABLE varchar(128) NOT NULL,
+  CC_PARTITION varchar(767),
+  CC_STATE char(1) NOT NULL,
+  CC_TYPE char(1) NOT NULL,
+  CC_WORKER_ID varchar(128),
+  CC_START NUMBER(19),
+  CC_END NUMBER(19),
+  CC_RUN_AS varchar(128),
+  CC_HIGHEST_TXN_ID NUMBER(19),
+  CC_META_INFO BLOB,
+  CC_HADOOP_JOB_ID varchar2(32)
+) ROWDEPENDENCIES;
+
+CREATE TABLE AUX_TABLE (
+  MT_KEY1 varchar2(128) NOT NULL,
+  MT_KEY2 number(19) NOT NULL,
+  MT_COMMENT varchar2(255),
+  PRIMARY KEY(MT_KEY1, MT_KEY2)
+);
+
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar2(128) NOT NULL,
+  WS_TABLE varchar2(128) NOT NULL,
+  WS_PARTITION varchar2(767),
+  WS_TXNID number(19) NOT NULL,
+  WS_COMMIT_ID number(19) NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql b/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
index 94ee2c4..5939b34 100644
--- a/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/upgrade-1.2.0-to-1.3.0.oracle.sql
@@ -11,6 +11,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0' AS Status from dual;
 @030-HIVE-12823.oracle.sql;
 @031-HIVE-12381.oracle.sql;
 @032-HIVE-12832.oracle.sql;
+@035-HIVE-13395.oracle.sql;
 
 UPDATE VERSION SET SCHEMA_VERSION='1.3.0', VERSION_COMMENT='Hive release version 1.3.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0' AS Status from dual;

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql b/metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql
index 8c065a1..a226d9a 100644
--- a/metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql
+++ b/metastore/scripts/upgrade/oracle/upgrade-2.0.0-to-2.1.0.oracle.sql
@@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0' AS Status from dual;
 
 @033-HIVE-12892.oracle.sql;
 @034-HIVE-13076.oracle.sql;
+@035-HIVE-13395.oracle.sql;
 
 UPDATE VERSION SET SCHEMA_VERSION='2.1.0', VERSION_COMMENT='Hive release version 2.1.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0' AS Status from dual;

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql b/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql
new file mode 100644
index 0000000..4dda283
--- /dev/null
+++ b/metastore/scripts/upgrade/postgres/034-HIVE-13395.postgres.sql
@@ -0,0 +1,10 @@
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar(128) NOT NULL,
+  WS_TABLE varchar(128) NOT NULL,
+  WS_PARTITION varchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);
+
+ALTER TABLE TXN_COMPONENTS ADD TC_OPERATION_TYPE char(1);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql
index e209489..43e984c 100644
--- a/metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/hive-schema-2.1.0.postgres.sql
@@ -1480,7 +1480,7 @@ GRANT ALL ON SCHEMA public TO PUBLIC;
 ------------------------------
 -- Transaction and lock tables
 ------------------------------
-\i hive-txn-schema-2.0.0.postgres.sql;
+\i hive-txn-schema-2.1.0.postgres.sql;
 
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
index b2fc1a8..b606f81 100644
--- a/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/hive-txn-schema-1.3.0.postgres.sql
@@ -33,7 +33,8 @@ CREATE TABLE TXN_COMPONENTS (
   TC_TXNID bigint REFERENCES TXNS (TXN_ID),
   TC_DATABASE varchar(128) NOT NULL,
   TC_TABLE varchar(128),
-  TC_PARTITION varchar(767) DEFAULT NULL
+  TC_PARTITION varchar(767) DEFAULT NULL,
+  TC_OPERATION_TYPE char(1) NOT NULL
 );
 
 CREATE TABLE COMPLETED_TXN_COMPONENTS (
@@ -118,4 +119,12 @@ CREATE TABLE AUX_TABLE (
   PRIMARY KEY(MT_KEY1, MT_KEY2)
 );
 
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar(128) NOT NULL,
+  WS_TABLE varchar(128) NOT NULL,
+  WS_PARTITION varchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/hive-txn-schema-2.1.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/hive-txn-schema-2.1.0.postgres.sql b/metastore/scripts/upgrade/postgres/hive-txn-schema-2.1.0.postgres.sql
new file mode 100644
index 0000000..262b93e
--- /dev/null
+++ b/metastore/scripts/upgrade/postgres/hive-txn-schema-2.1.0.postgres.sql
@@ -0,0 +1,129 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+--
+-- Tables for transaction management
+-- 
+
+CREATE TABLE TXNS (
+  TXN_ID bigint PRIMARY KEY,
+  TXN_STATE char(1) NOT NULL,
+  TXN_STARTED bigint NOT NULL,
+  TXN_LAST_HEARTBEAT bigint NOT NULL,
+  TXN_USER varchar(128) NOT NULL,
+  TXN_HOST varchar(128) NOT NULL,
+  TXN_AGENT_INFO varchar(128),
+  TXN_META_INFO varchar(128),
+  TXN_HEARTBEAT_COUNT integer
+);
+
+CREATE TABLE TXN_COMPONENTS (
+  TC_TXNID bigint REFERENCES TXNS (TXN_ID),
+  TC_DATABASE varchar(128) NOT NULL,
+  TC_TABLE varchar(128),
+  TC_PARTITION varchar(767) DEFAULT NULL,
+  TC_OPERATION_TYPE char(1) NOT NULL
+);
+
+CREATE TABLE COMPLETED_TXN_COMPONENTS (
+  CTC_TXNID bigint,
+  CTC_DATABASE varchar(128) NOT NULL,
+  CTC_TABLE varchar(128),
+  CTC_PARTITION varchar(767)
+);
+
+CREATE TABLE NEXT_TXN_ID (
+  NTXN_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_TXN_ID VALUES(1);
+
+CREATE TABLE HIVE_LOCKS (
+  HL_LOCK_EXT_ID bigint NOT NULL,
+  HL_LOCK_INT_ID bigint NOT NULL,
+  HL_TXNID bigint,
+  HL_DB varchar(128) NOT NULL,
+  HL_TABLE varchar(128),
+  HL_PARTITION varchar(767) DEFAULT NULL,
+  HL_LOCK_STATE char(1) NOT NULL,
+  HL_LOCK_TYPE char(1) NOT NULL,
+  HL_LAST_HEARTBEAT bigint NOT NULL,
+  HL_ACQUIRED_AT bigint,
+  HL_USER varchar(128) NOT NULL,
+  HL_HOST varchar(128) NOT NULL,
+  HL_HEARTBEAT_COUNT integer,
+  HL_AGENT_INFO varchar(128),
+  HL_BLOCKEDBY_EXT_ID bigint,
+  HL_BLOCKEDBY_INT_ID bigint,
+  PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID)
+); 
+
+CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS USING hash (HL_TXNID);
+
+CREATE TABLE NEXT_LOCK_ID (
+  NL_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_LOCK_ID VALUES(1);
+
+CREATE TABLE COMPACTION_QUEUE (
+  CQ_ID bigint PRIMARY KEY,
+  CQ_DATABASE varchar(128) NOT NULL,
+  CQ_TABLE varchar(128) NOT NULL,
+  CQ_PARTITION varchar(767),
+  CQ_STATE char(1) NOT NULL,
+  CQ_TYPE char(1) NOT NULL,
+  CQ_WORKER_ID varchar(128),
+  CQ_START bigint,
+  CQ_RUN_AS varchar(128),
+  CQ_HIGHEST_TXN_ID bigint,
+  CQ_META_INFO bytea,
+  CQ_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
+  NCQ_NEXT bigint NOT NULL
+);
+INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1);
+
+CREATE TABLE COMPLETED_COMPACTIONS (
+  CC_ID bigint PRIMARY KEY,
+  CC_DATABASE varchar(128) NOT NULL,
+  CC_TABLE varchar(128) NOT NULL,
+  CC_PARTITION varchar(767),
+  CC_STATE char(1) NOT NULL,
+  CC_TYPE char(1) NOT NULL,
+  CC_WORKER_ID varchar(128),
+  CC_START bigint,
+  CC_END bigint,
+  CC_RUN_AS varchar(128),
+  CC_HIGHEST_TXN_ID bigint,
+  CC_META_INFO bytea,
+  CC_HADOOP_JOB_ID varchar(32)
+);
+
+CREATE TABLE AUX_TABLE (
+  MT_KEY1 varchar(128) NOT NULL,
+  MT_KEY2 bigint NOT NULL,
+  MT_COMMENT varchar(255),
+  PRIMARY KEY(MT_KEY1, MT_KEY2)
+);
+
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar(128) NOT NULL,
+  WS_TABLE varchar(128) NOT NULL,
+  WS_PARTITION varchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+);

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql b/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
index 6eb5620..b1bcac0 100644
--- a/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/upgrade-1.2.0-to-1.3.0.postgres.sql
@@ -11,6 +11,7 @@ SELECT 'Upgrading MetaStore schema from 1.2.0 to 1.3.0';
 \i 029-HIVE-12823.postgres.sql;
 \i 030-HIVE-12831.postgres.sql;
 \i 031-HIVE-12832.postgres.sql;
+\i 034-HIVE-13395.postgres.sql;
 
 UPDATE "VERSION" SET "SCHEMA_VERSION"='1.3.0', "VERSION_COMMENT"='Hive release version 1.3.0' where "VER_ID"=1;
 SELECT 'Finished upgrading MetaStore schema from 1.2.0 to 1.3.0';

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql b/metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql
index e96a6ec..7fc603f 100644
--- a/metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql
+++ b/metastore/scripts/upgrade/postgres/upgrade-2.0.0-to-2.1.0.postgres.sql
@@ -2,6 +2,7 @@ SELECT 'Upgrading MetaStore schema from 2.0.0 to 2.1.0';
 
 \i 032-HIVE-12892.postgres.sql;
 \i 033-HIVE-13076.postgres.sql;
+\i 034-HIVE-13395.postgres.sql;
 
 UPDATE "VERSION" SET "SCHEMA_VERSION"='2.1.0', "VERSION_COMMENT"='Hive release version 2.1.0' where "VER_ID"=1;
 SELECT 'Finished upgrading MetaStore schema from 2.0.0 to 2.1.0';

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 9a09e7a..044b960 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -6748,6 +6748,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     }
     startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService"));
     startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidCompactionHistoryService"));
+    startHouseKeeperService(conf, Class.forName("org.apache.hadoop.hive.ql.txn.AcidWriteSetService"));
   }
   private static void startHouseKeeperService(HiveConf conf, Class c) throws Exception {
     //todo: when metastore adds orderly-shutdown logic, houseKeeper.stop()

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index c82d23a..facce54 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -21,11 +21,13 @@ import java.sql.Connection;
 import java.sql.Driver;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.SQLTransactionRollbackException;
 import java.sql.Statement;
 import java.util.Properties;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -82,7 +84,8 @@ public final class TxnDbUtil {
           "  TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
           "  TC_DATABASE varchar(128) NOT NULL," +
           "  TC_TABLE varchar(128)," +
-          "  TC_PARTITION varchar(767))");
+          "  TC_PARTITION varchar(767)," +
+          "  TC_OPERATION_TYPE char(1) NOT NULL)");
       stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
           "  CTC_TXNID bigint," +
           "  CTC_DATABASE varchar(128) NOT NULL," +
@@ -146,18 +149,24 @@ public final class TxnDbUtil {
         " CC_HADOOP_JOB_ID varchar(32))");
       
       stmt.execute("CREATE TABLE AUX_TABLE (" +
-        "  MT_KEY1 varchar(128) NOT NULL," +
-        "  MT_KEY2 bigint NOT NULL," +
-        "  MT_COMMENT varchar(255)," +
-        "  PRIMARY KEY(MT_KEY1, MT_KEY2)" +
-        ")");
-
-      conn.commit();
+        " MT_KEY1 varchar(128) NOT NULL," +
+        " MT_KEY2 bigint NOT NULL," +
+        " MT_COMMENT varchar(255)," +
+        " PRIMARY KEY(MT_KEY1, MT_KEY2))");
+      
+      stmt.execute("CREATE TABLE WRITE_SET (" +
+        " WS_DATABASE varchar(128) NOT NULL," +
+        " WS_TABLE varchar(128) NOT NULL," +
+        " WS_PARTITION varchar(767)," +
+        " WS_TXNID bigint NOT NULL," +
+        " WS_COMMIT_ID bigint NOT NULL," +
+        " WS_OPERATION_TYPE char(1) NOT NULL)"
+      );
     } catch (SQLException e) {
       try {
         conn.rollback();
       } catch (SQLException re) {
-        System.err.println("Error rolling back: " + re.getMessage());
+        LOG.error("Error rolling back: " + re.getMessage());
       }
 
       // This might be a deadlock, if so, let's retry
@@ -174,41 +183,60 @@ public final class TxnDbUtil {
   }
 
   public static void cleanDb() throws Exception {
-    Connection conn = null;
-    Statement stmt = null;
-    try {
-      conn = getConnection();
-      stmt = conn.createStatement();
-
-      // We want to try these, whether they succeed or fail.
+    int retryCount = 0;
+    while(++retryCount <= 3) {
+      boolean success = true;
+      Connection conn = null;
+      Statement stmt = null;
       try {
-        stmt.execute("DROP INDEX HL_TXNID_INDEX");
-      } catch (Exception e) {
-        System.err.println("Unable to drop index HL_TXNID_INDEX " + e.getMessage());
-      }
+        conn = getConnection();
+        stmt = conn.createStatement();
 
-      dropTable(stmt, "TXN_COMPONENTS");
-      dropTable(stmt, "COMPLETED_TXN_COMPONENTS");
-      dropTable(stmt, "TXNS");
-      dropTable(stmt, "NEXT_TXN_ID");
-      dropTable(stmt, "HIVE_LOCKS");
-      dropTable(stmt, "NEXT_LOCK_ID");
-      dropTable(stmt, "COMPACTION_QUEUE");
-      dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
-      dropTable(stmt, "COMPLETED_COMPACTIONS");
-      dropTable(stmt, "AUX_TABLE");
-      conn.commit();
-    } finally {
-      closeResources(conn, stmt, null);
+        // We want to try these, whether they succeed or fail.
+        try {
+          stmt.execute("DROP INDEX HL_TXNID_INDEX");
+        } catch (SQLException e) {
+          if(!("42X65".equals(e.getSQLState()) && 30000 == e.getErrorCode())) {
+            //42X65/3000 means index doesn't exist
+            LOG.error("Unable to drop index HL_TXNID_INDEX " + e.getMessage() +
+              "State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount);
+            success = false;
+          }
+        }
+
+        success &= dropTable(stmt, "TXN_COMPONENTS", retryCount);
+        success &= dropTable(stmt, "COMPLETED_TXN_COMPONENTS", retryCount);
+        success &= dropTable(stmt, "TXNS", retryCount);
+        success &= dropTable(stmt, "NEXT_TXN_ID", retryCount);
+        success &= dropTable(stmt, "HIVE_LOCKS", retryCount);
+        success &= dropTable(stmt, "NEXT_LOCK_ID", retryCount);
+        success &= dropTable(stmt, "COMPACTION_QUEUE", retryCount);
+        success &= dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID", retryCount);
+        success &= dropTable(stmt, "COMPLETED_COMPACTIONS", retryCount);
+        success &= dropTable(stmt, "AUX_TABLE", retryCount);
+        success &= dropTable(stmt, "WRITE_SET", retryCount);
+      } finally {
+        closeResources(conn, stmt, null);
+      }
+      if(success) {
+        return;
+      }
     }
   }
 
-  private static void dropTable(Statement stmt, String name) {
+  private static boolean dropTable(Statement stmt, String name, int retryCount) throws SQLException {
     try {
       stmt.execute("DROP TABLE " + name);
-    } catch (Exception e) {
-      System.err.println("Unable to drop table " + name + ": " + e.getMessage());
+      return true;
+    } catch (SQLException e) {
+      if("42Y55".equals(e.getSQLState()) && 30000 == e.getErrorCode()) {
+        //failed because object doesn't exist
+        return true;
+      }
+      LOG.error("Unable to drop table " + name + ": " + e.getMessage() +
+        " State=" + e.getSQLState() + " code=" + e.getErrorCode() + " retryCount=" + retryCount);
     }
+    return false;
   }
 
   /**
@@ -259,6 +287,32 @@ public final class TxnDbUtil {
       closeResources(conn, stmt, rs);
     }
   }
+  @VisibleForTesting
+  public static String queryToString(String query) throws Exception {
+    Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    StringBuilder sb = new StringBuilder();
+    try {
+      conn = getConnection();
+      stmt = conn.createStatement();
+      rs = stmt.executeQuery(query);
+      ResultSetMetaData rsmd = rs.getMetaData();
+      for(int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+        sb.append(rsmd.getColumnName(colPos)).append("   ");
+      }
+      sb.append('\n');
+      while(rs.next()) {
+        for (int colPos = 1; colPos <= rsmd.getColumnCount(); colPos++) {
+          sb.append(rs.getObject(colPos)).append("   ");
+        }
+        sb.append('\n');
+      }
+    } finally {
+      closeResources(conn, stmt, rs);
+    }
+    return sb.toString();
+  }
 
   static Connection getConnection() throws Exception {
     HiveConf conf = new HiveConf();
@@ -272,7 +326,7 @@ public final class TxnDbUtil {
     prop.setProperty("user", user);
     prop.setProperty("password", passwd);
     Connection conn = driver.connect(driverUrl, prop);
-    conn.setAutoCommit(false);
+    conn.setAutoCommit(true);
     return conn;
   }
 
@@ -281,7 +335,7 @@ public final class TxnDbUtil {
       try {
         rs.close();
       } catch (SQLException e) {
-        System.err.println("Error closing ResultSet: " + e.getMessage());
+        LOG.error("Error closing ResultSet: " + e.getMessage());
       }
     }
 


[09/13] hive git commit: HIVE-13701: LLAP: Use different prefix for llap task scheduler metrics (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Posted by jd...@apache.org.
HIVE-13701: LLAP: Use different prefix for llap task scheduler metrics (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0cc40456
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0cc40456
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0cc40456

Branch: refs/heads/llap
Commit: 0cc40456586aa5f3c54a34ceaf65eaef9a3d311b
Parents: 3517a99
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Thu May 5 21:43:48 2016 -0500
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Thu May 5 21:43:48 2016 -0500

----------------------------------------------------------------------
 ...doop-metrics2-llapdaemon.properties.template | 50 ++++++++++++++++++++
 ...trics2-llaptaskscheduler.properties.template | 50 ++++++++++++++++++++
 .../hadoop-metrics2.properties.template         | 50 --------------------
 .../tezplugins/LlapTaskSchedulerService.java    |  2 +-
 .../metrics/LlapTaskSchedulerMetrics.java       |  6 +--
 5 files changed, 104 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0cc40456/llap-server/src/main/resources/hadoop-metrics2-llapdaemon.properties.template
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/hadoop-metrics2-llapdaemon.properties.template b/llap-server/src/main/resources/hadoop-metrics2-llapdaemon.properties.template
new file mode 100644
index 0000000..994acaa
--- /dev/null
+++ b/llap-server/src/main/resources/hadoop-metrics2-llapdaemon.properties.template
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# syntax: [prefix].[source|sink].[instance].[options]
+# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
+
+#*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
+# default sampling period, in seconds
+#*.sink.file.period=10
+
+# *.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
+# *.sink.timeline.period=60
+
+# llapdeamon metrics for all contexts (jvm,queue,executors,cache) will go to this file
+# llapdaemon.sink.file.filename=llapdaemon-metrics.out
+
+# to configure separate files per context define following for each context
+# llapdaemon.sink.file_jvm.class=org.apache.hadoop.metrics2.sink.FileSink
+# llapdaemon.sink.file_jvm.context=jvm
+# llapdaemon.sink.file_jvm.filename=llapdaemon-jvm-metrics.out

http://git-wip-us.apache.org/repos/asf/hive/blob/0cc40456/llap-server/src/main/resources/hadoop-metrics2-llaptaskscheduler.properties.template
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/hadoop-metrics2-llaptaskscheduler.properties.template b/llap-server/src/main/resources/hadoop-metrics2-llaptaskscheduler.properties.template
new file mode 100644
index 0000000..5cf71a7
--- /dev/null
+++ b/llap-server/src/main/resources/hadoop-metrics2-llaptaskscheduler.properties.template
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#}
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# syntax: [prefix].[source|sink].[instance].[options]
+# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
+
+#*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
+# default sampling period, in seconds
+#*.sink.file.period=10
+
+# *.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
+# *.sink.timeline.period=60
+
+# llapdeamon metrics for all contexts (jvm,queue,executors,cache) will go to this file
+# llaptaskscheduler.sink.file.filename=llaptaskscheduler-metrics.out
+
+# to configure separate files per context define following for each context
+# llaptaskscheduler.sink.file_jvm.class=org.apache.hadoop.metrics2.sink.FileSink
+# llaptaskscheduler.sink.file_jvm.context=jvm
+# llaptaskscheduler.sink.file_jvm.filename=llaptaskscheduler-jvm-metrics.out

http://git-wip-us.apache.org/repos/asf/hive/blob/0cc40456/llap-server/src/main/resources/hadoop-metrics2.properties.template
----------------------------------------------------------------------
diff --git a/llap-server/src/main/resources/hadoop-metrics2.properties.template b/llap-server/src/main/resources/hadoop-metrics2.properties.template
deleted file mode 100644
index 994acaa..0000000
--- a/llap-server/src/main/resources/hadoop-metrics2.properties.template
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#}
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# syntax: [prefix].[source|sink].[instance].[options]
-# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
-
-#*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
-# default sampling period, in seconds
-#*.sink.file.period=10
-
-# *.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
-# *.sink.timeline.period=60
-
-# llapdeamon metrics for all contexts (jvm,queue,executors,cache) will go to this file
-# llapdaemon.sink.file.filename=llapdaemon-metrics.out
-
-# to configure separate files per context define following for each context
-# llapdaemon.sink.file_jvm.class=org.apache.hadoop.metrics2.sink.FileSink
-# llapdaemon.sink.file_jvm.context=jvm
-# llapdaemon.sink.file_jvm.filename=llapdaemon-jvm-metrics.out

http://git-wip-us.apache.org/repos/asf/hive/blob/0cc40456/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index da1e17f..733049d 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -267,7 +267,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
     if (initMetrics) {
       // Initialize the metrics system
-      LlapMetricsSystem.initialize("LlapDaemon");
+      LlapMetricsSystem.initialize("LlapTaskScheduler");
       this.pauseMonitor = new JvmPauseMonitor(conf);
       pauseMonitor.start();
       String displayName = "LlapTaskSchedulerMetrics-" + MetricsUtils.getHostName();

http://git-wip-us.apache.org/repos/asf/hive/blob/0cc40456/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
index b3230e2..04fd815 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
@@ -46,9 +46,9 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 
 /**
- * Metrics about the llap daemon task scheduler.
+ * Metrics about the llap task scheduler.
  */
-@Metrics(about = "LlapDaemon Task Scheduler Metrics", context = "scheduler")
+@Metrics(about = "Llap Task Scheduler Metrics", context = "scheduler")
 public class LlapTaskSchedulerMetrics implements MetricsSource {
 
   private final String name;
@@ -99,7 +99,7 @@ public class LlapTaskSchedulerMetrics implements MetricsSource {
   public void getMetrics(MetricsCollector collector, boolean b) {
     MetricsRecordBuilder rb = collector.addRecord(SchedulerMetrics)
         .setContext("scheduler")
-        .tag(ProcessName, MetricsUtils.METRICS_PROCESS_NAME)
+        .tag(ProcessName, "DAGAppMaster")
         .tag(SessionId, sessionId);
     getTaskSchedulerStats(rb);
   }


[07/13] hive git commit: HIVE-13395 (addednum) Lost Update problem in ACID (Eugene Koifman, reviewed by Alan Gates)

Posted by jd...@apache.org.
HIVE-13395 (addednum) Lost Update problem in ACID (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/eb2c54b3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/eb2c54b3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/eb2c54b3

Branch: refs/heads/llap
Commit: eb2c54b3f80d958c36c22dfb0ee962806e673830
Parents: 794f161
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu May 5 15:29:00 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu May 5 15:29:00 2016 -0700

----------------------------------------------------------------------
 .../scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/eb2c54b3/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
----------------------------------------------------------------------
diff --git a/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql b/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
index ea42757..d873012 100644
--- a/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
+++ b/metastore/scripts/upgrade/mysql/hive-txn-schema-1.3.0.mysql.sql
@@ -34,6 +34,7 @@ CREATE TABLE TXN_COMPONENTS (
   TC_DATABASE varchar(128) NOT NULL,
   TC_TABLE varchar(128),
   TC_PARTITION varchar(767),
+  TC_OPERATION_TYPE char(1) NOT NULL,
   FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
@@ -120,3 +121,12 @@ CREATE TABLE AUX_TABLE (
   PRIMARY KEY(MT_KEY1, MT_KEY2)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
+CREATE TABLE WRITE_SET (
+  WS_DATABASE varchar(128) NOT NULL,
+  WS_TABLE varchar(128) NOT NULL,
+  WS_PARTITION varchar(767),
+  WS_TXNID bigint NOT NULL,
+  WS_COMMIT_ID bigint NOT NULL,
+  WS_OPERATION_TYPE char(1) NOT NULL
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+


[02/13] hive git commit: HIVE-13395 Lost Update problem in ACID (Eugene Koifman, reviewed by Alan Gates)

Posted by jd...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index c0fa97a..06cd4aa 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -72,7 +72,7 @@ import java.util.regex.Pattern;
  * used to properly sequence operations.  Most notably:
  * 1. various sequence IDs are generated with aid of this mutex
  * 2. ensuring that each (Hive) Transaction state is transitioned atomically.  Transaction state
- *  includes it's actual state (Open, Aborted) as well as it's lock list/component list.  Thus all
+ *  includes its actual state (Open, Aborted) as well as it's lock list/component list.  Thus all
  *  per transaction ops, either start by update/delete of the relevant TXNS row or do S4U on that row.
  *  This allows almost all operations to run at READ_COMMITTED and minimizes DB deadlocks.
  * 3. checkLock() - this is mutexted entirely since we must ensure that while we check if some lock
@@ -126,6 +126,41 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   static private DataSource connPool;
   static private boolean doRetryOnConnPool = false;
+  
+  private enum OpertaionType {
+    INSERT('i'), UPDATE('u'), DELETE('d');
+    private final char sqlConst;
+    OpertaionType(char sqlConst) {
+      this.sqlConst = sqlConst;
+    }
+    public String toString() {
+      return Character.toString(sqlConst);
+    }
+    public static OpertaionType fromString(char sqlConst) {
+      switch (sqlConst) {
+        case 'i':
+          return INSERT;
+        case 'u':
+          return UPDATE;
+        case 'd':
+          return DELETE;
+        default:
+          throw new IllegalArgumentException(quoteChar(sqlConst));
+      }
+    }
+    //we should instead just pass in OpertaionType from client (HIVE-13622)
+    @Deprecated
+    public static OpertaionType fromLockType(LockType lockType) {
+      switch (lockType) {
+        case SHARED_READ:
+          return INSERT;
+        case SHARED_WRITE:
+          return UPDATE;
+        default:
+          throw new IllegalArgumentException("Unexpected lock type: " + lockType);
+      }
+    }
+  }
 
   /**
    * Number of consecutive deadlocks we have seen
@@ -454,6 +489,31 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  /**
+   * Concurrency/isolation notes:
+   * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)}
+   * operations using select4update on NEXT_TXN_ID.  Also, mutexes on TXNX table for specific txnid:X
+   * see more notes below.
+   * In order to prevent lost updates, we need to determine if any 2 transactions overlap.  Each txn
+   * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence
+   * so that we can compare commit time of txn T with start time of txn S.  This sequence can be thought of
+   * as a logical time counter.  If S.commitTime < T.startTime, T and S do NOT overlap.
+   *
+   * Motivating example:
+   * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1
+   * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot
+   * that they read appropriately.  In particular, if txns do not overlap, then one follows the other
+   * (assumig they write the same entity), and thus the 2nd must see changes of the 1st.  We ensure
+   * this by locking in snapshot after 
+   * {@link #openTxns(OpenTxnRequest)} call is made (see {@link org.apache.hadoop.hive.ql.Driver#acquireLocksAndOpenTxn()})
+   * and mutexing openTxn() with commit().  In other words, once a S.commit() starts we must ensure
+   * that txn T which will be considered a later txn, locks in a snapshot that includes the result
+   * of S's commit (assuming no other txns).
+   * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions
+   * were running in parallel).  If T and S both locked in the same snapshot (for example commit of
+   * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed)
+   * 'x' would be updated to the same value by both, i.e. lost update. 
+   */
   public void commitTxn(CommitTxnRequest rqst)
     throws NoSuchTxnException, TxnAbortedException,  MetaException {
     long txnid = rqst.getTxnid();
@@ -461,40 +521,116 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       Connection dbConn = null;
       Statement stmt = null;
       ResultSet lockHandle = null;
+      ResultSet commitIdRs = null, rs;
       try {
         lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        /**
+         * This S4U will mutex with other commitTxn() and openTxns(). 
+         * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial
+         * Note: it's possible to have several txns have the same commit id.  Suppose 3 txns start
+         * at the same time and no new txns start until all 3 commit.
+         * We could've incremented the sequence for commitId is well but it doesn't add anything functionally.
+         */
+        commitIdRs = stmt.executeQuery(addForUpdateClause("select ntxn_next - 1 from NEXT_TXN_ID"));
+        if(!commitIdRs.next()) {
+          throw new IllegalStateException("No rows found in NEXT_TXN_ID");
+        }
+        long commitId = commitIdRs.getLong(1);
         /**
          * Runs at READ_COMMITTED with S4U on TXNS row for "txnid".  S4U ensures that no other
          * operation can change this txn (such acquiring locks). While lock() and commitTxn()
          * should not normally run concurrently (for same txn) but could due to bugs in the client
          * which could then corrupt internal transaction manager state.  Also competes with abortTxn().
          */
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-
         lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
         if(lockHandle == null) {
           //this also ensures that txn is still there and in expected state (hasn't been timed out)
           ensureValidTxn(dbConn, txnid, stmt);
           shouldNeverHappen(txnid);
         }
-
+        Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
+        int numCompsWritten = stmt.executeUpdate("insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" +
+          " select tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " +
+          "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" + quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")");
+        if(numCompsWritten == 0) {
+          /**
+           * current txn didn't update/delete anything (may have inserted), so just proceed with commit
+           * 
+           * We only care about commit id for write txns, so for RO (when supported) txns we don't
+           * have to mutex on NEXT_TXN_ID.
+           * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's
+           * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn.
+           * If RO < W, then there is no reads-from relationship.
+           */
+        }
+        else {
+          /**
+           * see if there are any overlapping txns wrote the same element, i.e. have a conflict
+           * Since entire commit operation is mutexed wrt other start/commit ops,
+           * committed.ws_commit_id <= current.ws_commit_id for all txns
+           * thus if committed.ws_commit_id < current.ws_txnid, transactions do NOT overlap
+           * For example, [17,20] is committed, [6,80] is being committed right now - these overlap
+           * [17,20] committed and [21,21] committing now - these do not overlap.
+           * [17,18] committed and [18,19] committing now - these overlap  (here 18 started while 17 was still running)
+           */
+          rs = stmt.executeQuery
+            (addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," +
+              "committed.ws_table, committed.ws_partition, cur.ws_commit_id " + 
+              "from WRITE_SET committed INNER JOIN WRITE_SET cur " +
+            "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " +
+              //For partitioned table we always track writes at partition level (never at table)
+              //and for non partitioned - always at table level, thus the same table should never
+              //have entries with partition key and w/o
+            "and (committed.ws_partition=cur.ws_partition or (committed.ws_partition is null and cur.ws_partition is null)) " +
+            "where cur.ws_txnid <= committed.ws_commit_id" + //txns overlap; could replace ws_txnid
+              // with txnid, though any decent DB should infer this
+            " and cur.ws_txnid=" + txnid + //make sure RHS of join only has rows we just inserted as
+              // part of this commitTxn() op
+            " and committed.ws_txnid <> " + txnid + //and LHS only has committed txns
+              //U+U and U+D is a conflict but D+D is not and we don't currently track I in WRITE_SET at all
+              " and (committed.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) +  
+                    " OR cur.ws_operation_type=" + quoteChar(OpertaionType.UPDATE.sqlConst) + ")"));
+          if(rs.next()) {
+            //found a conflict
+            String committedTxn = "[" + JavaUtils.txnIdToString(rs.getLong(1)) + "," + rs.getLong(2) + "]";
+            StringBuilder resource = new StringBuilder(rs.getString(3)).append("/").append(rs.getString(4));
+            String partitionName = rs.getString(5);
+            if(partitionName != null) {
+              resource.append('/').append(partitionName);
+            }
+            String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource +
+              " committed by " + committedTxn;
+            close(rs);
+            //remove WRITE_SET info for current txn since it's about to abort
+            dbConn.rollback(undoWriteSetForCurrentTxn);
+            LOG.info(msg);
+            //todo: should make abortTxns() write something into TXNS.TXN_META_INFO about this
+            if(abortTxns(dbConn, Collections.singletonList(txnid)) != 1) {
+              throw new IllegalStateException(msg + " FAILED!");
+            }
+            dbConn.commit();
+            close(null, stmt, dbConn);
+            throw new TxnAbortedException(msg);
+          }
+          else {
+            //no conflicting operations, proceed with the rest of commit sequence
+          }
+        }
         // Move the record from txn_components into completed_txn_components so that the compactor
         // knows where to look to compact.
         String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " +
           "tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
         LOG.debug("Going to execute insert <" + s + ">");
         if (stmt.executeUpdate(s) < 1) {
-          //this can be reasonable for an empty txn START/COMMIT
+          //this can be reasonable for an empty txn START/COMMIT or read-only txn
           LOG.info("Expected to move at least one record from txn_components to " +
             "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
         }
-
-        // Always access TXN_COMPONENTS before HIVE_LOCKS;
         s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
         stmt.executeUpdate(s);
-        // Always access HIVE_LOCKS before TXNS
         s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
         stmt.executeUpdate(s);
@@ -510,6 +646,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
+        close(commitIdRs);
         close(lockHandle, stmt, dbConn);
         unlockInternal();
       }
@@ -517,7 +654,50 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       commitTxn(rqst);
     }
   }
-
+  @Override
+  public void performWriteSetGC() {
+    Connection dbConn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+      stmt = dbConn.createStatement();
+      rs = stmt.executeQuery("select ntxn_next - 1 from NEXT_TXN_ID");
+      if(!rs.next()) {
+        throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted");
+      }
+      long highestAllocatedTxnId = rs.getLong(1);
+      close(rs);
+      rs = stmt.executeQuery("select min(txn_id) from TXNS where txn_state=" + quoteChar(TXN_OPEN));
+      if(!rs.next()) {
+        throw new IllegalStateException("Scalar query returned no rows?!?!!");
+      }
+      long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark
+      long lowestOpenTxnId = rs.getLong(1);
+      if(rs.wasNull()) {
+        //if here then there are no Open txns and  highestAllocatedTxnId must be
+        //resolved (i.e. committed or aborted), either way
+        //there are no open txns with id <= highestAllocatedTxnId
+        //the +1 is there because "delete ..." below has < (which is correct for the case when
+        //there is an open txn
+        //Concurrency: even if new txn starts (or starts + commits) it is still true that
+        //there are no currently open txns that overlap with any committed txn with 
+        //commitId <= commitHighWaterMark (as set on next line).  So plain READ_COMMITTED is enough.
+        commitHighWaterMark = highestAllocatedTxnId + 1;
+      }
+      else {
+        commitHighWaterMark = lowestOpenTxnId;
+      }
+      int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark);
+      LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET");
+      dbConn.commit();
+    } catch (SQLException ex) {
+      LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
+    }
+    finally {
+      close(rs, stmt, dbConn);
+    }
+  }
   /**
    * As much as possible (i.e. in absence of retries) we want both operations to be done on the same
    * connection (but separate transactions).  This avoid some flakiness in BONECP where if you
@@ -545,7 +725,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   /**
    * Note that by definition select for update is divorced from update, i.e. you executeQuery() to read
-   * and then executeUpdate().  One other alternative would be to actually update the row in TXNX but
+   * and then executeUpdate().  One other alternative would be to actually update the row in TXNS but
    * to the same value as before thus forcing db to acquire write lock for duration of the transaction.
    *
    * There is no real reason to return the ResultSet here other than to make sure the reference to it
@@ -616,6 +796,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         stmt.executeUpdate(s);
 
         if (txnid > 0) {
+          /**DBTxnManager#acquireLocks() knows if it's I/U/D (that's how it decides what lock to get)
+           * So if we add that to LockRequest we'll know that here 
+           * Should probably add it to LockComponent so that if in the future we decide wo allow 1 LockRequest
+           * to contain LockComponent for multiple operations.
+           * Deriving it from lock info doesn't distinguish between Update and Delete
+           * 
+           * QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc
+           * FileSinkDesc.table is ql.metadata.Table
+           * Table.tableSpec which is TableSpec, which has specType which is SpecType
+           * So maybe this can work to know that this is part of dynamic partition insert in which case
+           * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here.
+           * In any case, that's an optimization for now;  will be required when adding multi-stmt txns
+           */
           // For each component in this lock request,
           // add an entry to the txn_components table
           // This must be done before HIVE_LOCKS is accessed
@@ -624,10 +817,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             String tblName = lc.getTablename();
             String partName = lc.getPartitionname();
             s = "insert into TXN_COMPONENTS " +
-              "(tc_txnid, tc_database, tc_table, tc_partition) " +
+              "(tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) " +
               "values (" + txnid + ", '" + dbName + "', " +
               (tblName == null ? "null" : "'" + tblName + "'") + ", " +
-              (partName == null ? "null" : "'" + partName + "'") + ")";
+              (partName == null ? "null" : "'" + partName + "'")+ "," +
+              quoteString(OpertaionType.fromLockType(lc.getType()).toString()) + ")";
             LOG.debug("Going to execute update <" + s + ">");
             stmt.executeUpdate(s);
           }
@@ -698,9 +892,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         lockInternal();
         if(dbConn.isClosed()) {
           //should only get here if retrying this op
-          dbConn = getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+          dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         }
-        dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
         return checkLock(dbConn, extLockId);
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
@@ -756,7 +949,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         //todo: strictly speaking there is a bug here.  heartbeat*() commits but both heartbeat and
         //checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired
         //extra heartbeat is logically harmless, but ...
-        dbConn.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
         return checkLock(dbConn, extLockId);
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
@@ -1162,11 +1354,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     throw new RuntimeException("This should never happen: " + JavaUtils.txnIdToString(txnid) + " "
       + JavaUtils.lockIdToString(extLockId) + " " + intLockId);
   }
+
   public void addDynamicPartitions(AddDynamicPartitions rqst)
       throws NoSuchTxnException,  TxnAbortedException, MetaException {
     Connection dbConn = null;
     Statement stmt = null;
     ResultSet lockHandle = null;
+    ResultSet rs = null;
     try {
       try {
         lockInternal();
@@ -1178,18 +1372,35 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           ensureValidTxn(dbConn, rqst.getTxnid(), stmt);
           shouldNeverHappen(rqst.getTxnid());
         }
+        //we should be able to get this from AddDynamicPartitions object longer term; in fact we'd have to
+        //for multi stmt txns if same table is written more than once per tx
+        // MoveTask knows if it's I/U/D
+        // MoveTask calls Hive.loadDynamicPartitions() which calls HiveMetaStoreClient.addDynamicPartitions()
+        // which ends up here so we'd need to add a field to AddDynamicPartitions.
+        String findOperationType = " tc_operation_type from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid()
+          + " and tc_database=" + quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
+        //do limit 1 on this; currently they will all have the same operations
+        rs = stmt.executeQuery(addLimitClause(1, findOperationType));
+        if(!rs.next()) {
+          throw new IllegalStateException("Unable to determine tc_operation_type for " + JavaUtils.txnIdToString(rqst.getTxnid()));
+        }
+        OpertaionType ot = OpertaionType.fromString(rs.getString(1).charAt(0));
+        
+        //what if a txn writes the same table > 1 time... let's go with this for now, but really
+        //need to not write this in the first place, i.e. make this delete not needed
+        //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS
+        String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" +
+          quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
+        //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is
+        //much "wider" than necessary in a lot of cases.  Here on the other hand, we know exactly which
+        //partitions have been written to.  w/o this WRITE_SET would contain entries for partitions not actually
+        //written to
+        stmt.executeUpdate(deleteSql);
         for (String partName : rqst.getPartitionnames()) {
-          StringBuilder buff = new StringBuilder();
-          buff.append("insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition) values (");
-          buff.append(rqst.getTxnid());
-          buff.append(", '");
-          buff.append(rqst.getDbname());
-          buff.append("', '");
-          buff.append(rqst.getTablename());
-          buff.append("', '");
-          buff.append(partName);
-          buff.append("')");
-          String s = buff.toString();
+          String s =
+            "insert into TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type) values (" +
+              rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
+              "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + ")";
           LOG.debug("Going to execute update <" + s + ">");
           stmt.executeUpdate(s);
         }
@@ -1908,60 +2119,113 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     return txnId != 0;
   }
   /**
+   * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller
+   * hl_lock_ext_id by only checking earlier locks.
+   *
+   * For any given SQL statment all locks required by it are grouped under single extLockId and are
+   * granted all at once or all locks wait.
+   *
+   * This is expected to run at READ_COMMITTED.
+   *
    * Note: this calls acquire() for (extLockId,intLockId) but extLockId is the same and we either take
    * all locks for given extLockId or none.  Would be more efficient to update state on all locks
-   * at once.  Semantics are the same since this is all part of the same txn@serializable.
+   * at once.  Semantics are the same since this is all part of the same txn.
    *
-   * Lock acquisition is meant to be fair, so every lock can only block on some lock with smaller
-   * hl_lock_ext_id by only checking earlier locks.
+   * If there is a concurrent commitTxn/rollbackTxn, those can only remove rows from HIVE_LOCKS.
+   * If they happen to be for the same txnid, there will be a WW conflict (in MS DB), if different txnid,
+   * checkLock() will in the worst case keep locks in Waiting state a little longer.
    */
-  private LockResponse checkLock(Connection dbConn,
-                                 long extLockId)
+  private LockResponse checkLock(Connection dbConn, long extLockId)
     throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, MetaException, SQLException {
-    if(dbConn.getTransactionIsolation() != Connection.TRANSACTION_SERIALIZABLE) {
-      //longer term we should instead use AUX_TABLE/S4U to serialize all checkLock() operations
-      //that would be less prone to deadlocks
-      throw new IllegalStateException("Unexpected Isolation Level: " + dbConn.getTransactionIsolation());
-    }
-    List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
+    TxnStore.MutexAPI.LockHandle handle =  null;
+    Statement stmt = null;
+    ResultSet rs = null;
     LockResponse response = new LockResponse();
-    response.setLockid(extLockId);
-
-    LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
-    Savepoint save = dbConn.setSavepoint();//todo: get rid of this
-    StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
-      "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
-      "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
-
-    Set<String> strings = new HashSet<String>(locksBeingChecked.size());
-    for (LockInfo info : locksBeingChecked) {
-      strings.add(info.db);
-    }
-    boolean first = true;
-    for (String s : strings) {
-      if (first) first = false;
-      else query.append(", ");
-      query.append('\'');
-      query.append(s);
-      query.append('\'');
-    }
-    query.append(")");
-
-    // If any of the table requests are null, then I need to pull all the
-    // table locks for this db.
-    boolean sawNull = false;
-    strings.clear();
-    for (LockInfo info : locksBeingChecked) {
-      if (info.table == null) {
-        sawNull = true;
-        break;
-      } else {
-        strings.add(info.table);
+    /**
+     * todo: Longer term we should pass this from client somehow - this would be an optimization;  once
+     * that is in place make sure to build and test "writeSet" below using OperationType not LockType
+     */
+    boolean isPartOfDynamicPartitionInsert = true;
+    try {
+      /**
+       * checkLock() must be mutexed against any other checkLock to make sure 2 conflicting locks
+       * are not granted by parallel checkLock() calls.
+       */
+      handle = getMutexAPI().acquireLock(MUTEX_KEY.CheckLock.name());
+      List<LockInfo> locksBeingChecked = getLockInfoFromLockId(dbConn, extLockId);//being acquired now
+      response.setLockid(extLockId);
+
+      LOG.debug("checkLock(): Setting savepoint. extLockId=" + JavaUtils.lockIdToString(extLockId));
+      Savepoint save = dbConn.setSavepoint();//todo: get rid of this
+      StringBuilder query = new StringBuilder("select hl_lock_ext_id, " +
+        "hl_lock_int_id, hl_db, hl_table, hl_partition, hl_lock_state, " +
+        "hl_lock_type, hl_txnid from HIVE_LOCKS where hl_db in (");
+
+      Set<String> strings = new HashSet<String>(locksBeingChecked.size());
+
+      //This the set of entities that the statement represnted by extLockId wants to update
+      List<LockInfo> writeSet = new ArrayList<>();
+
+      for (LockInfo info : locksBeingChecked) {
+        strings.add(info.db);
+        if(!isPartOfDynamicPartitionInsert && info.type == LockType.SHARED_WRITE) {
+          writeSet.add(info);
+        }
       }
-    }
-    if (!sawNull) {
-      query.append(" and (hl_table is null or hl_table in(");
-      first = true;
+      if(!writeSet.isEmpty()) {
+        if(writeSet.get(0).txnId == 0) {
+          //Write operation always start a txn
+          throw new IllegalStateException("Found Write lock for " + JavaUtils.lockIdToString(extLockId) + " but no txnid");
+        }
+        stmt = dbConn.createStatement();
+        StringBuilder sb = new StringBuilder(" ws_database, ws_table, ws_partition, " +
+          "ws_txnid, ws_commit_id " +
+          "from WRITE_SET where ws_commit_id >= " + writeSet.get(0).txnId + " and (");//see commitTxn() for more info on this inequality
+        for(LockInfo info : writeSet) {
+          sb.append("(ws_database = ").append(quoteString(info.db)).append(" and ws_table = ")
+            .append(quoteString(info.table)).append(" and ws_partition ")
+            .append(info.partition == null ? "is null" : "= " + quoteString(info.partition)).append(") or ");
+        }
+        sb.setLength(sb.length() - 4);//nuke trailing " or "
+        sb.append(")");
+        //1 row is sufficient to know we have to kill the query
+        rs = stmt.executeQuery(addLimitClause(1, sb.toString()));
+        if(rs.next()) {
+          /**
+           * if here, it means we found an already committed txn which overlaps with the current one and
+           * it updated the same resource the current txn wants to update.  By First-committer-wins
+           * rule, current txn will not be allowed to commit so  may as well kill it now;  This is just an
+           * optimization to prevent wasting cluster resources to run a query which is known to be DOA.
+           * {@link #commitTxn(CommitTxnRequest)} has the primary responsibility to ensure this.
+           * checkLock() runs at READ_COMMITTED so you could have another (Hive) txn running commitTxn()
+           * in parallel and thus writing to WRITE_SET.  commitTxn() logic is properly mutexed to ensure
+           * that we don't "miss" any WW conflicts. We could've mutexed the checkLock() and commitTxn()
+           * as well but this reduces concurrency for very little gain.
+           * Note that update/delete (which runs as dynamic partition insert) acquires a lock on the table,
+           * but WRITE_SET has entries for actual partitions updated.  Thus this optimization will "miss"
+           * the WW conflict but it will be caught in commitTxn() where actual partitions written are known.
+           * This is OK since we want 2 concurrent updates that update different sets of partitions to both commit.
+           */
+          String resourceName = rs.getString(1) + '/' + rs.getString(2);
+          String partName = rs.getString(3);
+          if(partName != null) {
+            resourceName += '/' + partName;
+          }
+
+          String msg = "Aborting " + JavaUtils.txnIdToString(writeSet.get(0).txnId) +
+            " since a concurrent committed transaction [" + JavaUtils.txnIdToString(rs.getLong(4)) + "," + rs.getLong(5) +
+            "] has already updated resouce '" + resourceName + "'";
+          LOG.info(msg);
+          if(abortTxns(dbConn, Collections.singletonList(writeSet.get(0).txnId)) != 1) {
+            throw new IllegalStateException(msg + " FAILED!");
+          }
+          dbConn.commit();
+          throw new TxnAbortedException(msg);
+        }
+        close(rs, stmt, null);
+      }
+
+      boolean first = true;
       for (String s : strings) {
         if (first) first = false;
         else query.append(", ");
@@ -1969,22 +2233,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         query.append(s);
         query.append('\'');
       }
-      query.append("))");
+      query.append(")");
 
-      // If any of the partition requests are null, then I need to pull all
-      // partition locks for this table.
-      sawNull = false;
+      // If any of the table requests are null, then I need to pull all the
+      // table locks for this db.
+      boolean sawNull = false;
       strings.clear();
       for (LockInfo info : locksBeingChecked) {
-        if (info.partition == null) {
+        if (info.table == null) {
           sawNull = true;
           break;
         } else {
-          strings.add(info.partition);
+          strings.add(info.table);
         }
       }
       if (!sawNull) {
-        query.append(" and (hl_partition is null or hl_partition in(");
+        query.append(" and (hl_table is null or hl_table in(");
         first = true;
         for (String s : strings) {
           if (first) first = false;
@@ -1994,14 +2258,35 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           query.append('\'');
         }
         query.append("))");
+
+        // If any of the partition requests are null, then I need to pull all
+        // partition locks for this table.
+        sawNull = false;
+        strings.clear();
+        for (LockInfo info : locksBeingChecked) {
+          if (info.partition == null) {
+            sawNull = true;
+            break;
+          } else {
+            strings.add(info.partition);
+          }
+        }
+        if (!sawNull) {
+          query.append(" and (hl_partition is null or hl_partition in(");
+          first = true;
+          for (String s : strings) {
+            if (first) first = false;
+            else query.append(", ");
+            query.append('\'');
+            query.append(s);
+            query.append('\'');
+          }
+          query.append("))");
+        }
       }
-    }
-    query.append(" and hl_lock_ext_id <= ").append(extLockId);
+      query.append(" and hl_lock_ext_id <= ").append(extLockId);
 
-    LOG.debug("Going to execute query <" + query.toString() + ">");
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
+      LOG.debug("Going to execute query <" + query.toString() + ">");
       stmt = dbConn.createStatement();
       rs = stmt.executeQuery(query.toString());
       SortedSet<LockInfo> lockSet = new TreeSet<LockInfo>(new LockInfoComparator());
@@ -2117,6 +2402,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       response.setState(LockState.ACQUIRED);
     } finally {
       close(rs, stmt, null);
+      if(handle != null) {
+        handle.releaseLocks();
+      }
     }
     return response;
   }
@@ -2158,7 +2446,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     String s = "update HIVE_LOCKS set hl_lock_state = '" + LOCK_ACQUIRED + "', " +
       //if lock is part of txn, heartbeat info is in txn record
       "hl_last_heartbeat = " + (isValidTxn(lockInfo.txnId) ? 0 : now) +
-    ", hl_acquired_at = " + now + " where hl_lock_ext_id = " +
+    ", hl_acquired_at = " + now + ",HL_BLOCKEDBY_EXT_ID=NULL,HL_BLOCKEDBY_INT_ID=null" + " where hl_lock_ext_id = " +
       extLockId + " and hl_lock_int_id = " + lockInfo.intLockId;
     LOG.debug("Going to execute update <" + s + ">");
     int rc = stmt.executeUpdate(s);
@@ -2238,6 +2526,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       //todo: add LIMIT 1 instead of count - should be more efficient
       s = "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TXNID = " + txnid;
       ResultSet rs2 = stmt.executeQuery(s);
+      //todo: strictly speaking you can commit an empty txn, thus 2nd conjunct is wrong but only
+      //possible for for multi-stmt txns
       boolean alreadyCommitted = rs2.next() && rs2.getInt(1) > 0;
       LOG.debug("Going to rollback");
       dbConn.rollback();

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 927e9bc..f9cac18 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -47,7 +47,7 @@ import java.util.Set;
 @InterfaceStability.Evolving
 public interface TxnStore {
 
-  public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory}
+  public static enum MUTEX_KEY {Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock, WriteSetCleaner}
   // Compactor states (Should really be enum)
   static final public String INITIATED_RESPONSE = "initiated";
   static final public String WORKING_RESPONSE = "working";
@@ -321,6 +321,12 @@ public interface TxnStore {
   public void purgeCompactionHistory() throws MetaException;
 
   /**
+   * WriteSet tracking is used to ensure proper transaction isolation.  This method deletes the 
+   * transaction metadata once it becomes unnecessary.  
+   */
+  public void performWriteSetGC();
+
+  /**
    * Determine if there are enough consecutive failures compacting a table or partition that no
    * new automatic compactions should be scheduled.  User initiated compactions do not do this
    * check.

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index cc9e583..b829d9d 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -69,6 +69,8 @@ public class TxnUtils {
    * @return a valid txn list.
    */
   public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse txns) {
+    //todo: this could be more efficient: using select min(txn_id) from TXNS where txn_state=" +
+    // quoteChar(TXN_OPEN)  to compute compute HWM...
     long highWater = txns.getTxn_high_water_mark();
     long minOpenTxn = Long.MAX_VALUE;
     long[] exceptions = new long[txns.getOpen_txnsSize()];

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 2c1560b..80e3cd6 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -413,7 +413,7 @@ public class TestCompactionTxnHandler {
     lc.setTablename(tableName);
     LockRequest lr = new LockRequest(Arrays.asList(lc), "me", "localhost");
     lr.setTxnid(txnId);
-    LockResponse lock = txnHandler.lock(new LockRequest(Arrays.asList(lc), "me", "localhost"));
+    LockResponse lock = txnHandler.lock(lr);
     assertEquals(LockState.ACQUIRED, lock.getState());
 
     txnHandler.addDynamicPartitions(new AddDynamicPartitions(txnId, dbName, tableName,
@@ -429,8 +429,8 @@ public class TestCompactionTxnHandler {
       assertEquals(dbName, ci.dbname);
       assertEquals(tableName, ci.tableName);
       switch (i++) {
-      case 0: assertEquals("ds=today", ci.partName); break;
-      case 1: assertEquals("ds=yesterday", ci.partName); break;
+        case 0: assertEquals("ds=today", ci.partName); break;
+        case 1: assertEquals("ds=yesterday", ci.partName); break;
       default: throw new RuntimeException("What?");
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 28d0269..1a118a9 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -483,6 +483,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.ACQUIRED);
   }
@@ -514,6 +515,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.WAITING);
   }
@@ -580,6 +582,7 @@ public class TestTxnHandler {
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     LockResponse res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.ACQUIRED);
 
@@ -602,6 +605,7 @@ public class TestTxnHandler {
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     LockResponse res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.ACQUIRED);
 
@@ -611,6 +615,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.WAITING);
 
@@ -633,6 +638,7 @@ public class TestTxnHandler {
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     LockResponse res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.ACQUIRED);
 
@@ -642,6 +648,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.WAITING);
 
@@ -651,6 +658,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.WAITING);
   }
@@ -682,6 +690,7 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     assertTrue(res.getState() == LockState.WAITING);
   }
@@ -725,6 +734,8 @@ public class TestTxnHandler {
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
+    long txnId = openTxn();
+    req.setTxnid(txnId);
     LockResponse res = txnHandler.lock(req);
     long lockid1 = res.getLockid();
     assertTrue(res.getState() == LockState.ACQUIRED);
@@ -735,11 +746,12 @@ public class TestTxnHandler {
     components.clear();
     components.add(comp);
     req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(openTxn());
     res = txnHandler.lock(req);
     long lockid2 = res.getLockid();
     assertTrue(res.getState() == LockState.WAITING);
 
-    txnHandler.unlock(new UnlockRequest(lockid1));
+    txnHandler.abortTxn(new AbortTxnRequest(txnId));
     res = txnHandler.checkLock(new CheckLockRequest(lockid2));
     assertTrue(res.getState() == LockState.ACQUIRED);
   }
@@ -1070,16 +1082,14 @@ public class TestTxnHandler {
   @Test
   public void showLocks() throws Exception {
     long begining = System.currentTimeMillis();
-    long txnid = openTxn();
     LockComponent comp = new LockComponent(LockType.EXCLUSIVE, LockLevel.DB, "mydb");
     List<LockComponent> components = new ArrayList<LockComponent>(1);
     components.add(comp);
     LockRequest req = new LockRequest(components, "me", "localhost");
-    req.setTxnid(txnid);
     LockResponse res = txnHandler.lock(req);
 
     // Open txn
-    txnid = openTxn();
+    long txnid = openTxn();
     comp = new LockComponent(LockType.SHARED_READ, LockLevel.TABLE, "mydb");
     comp.setTablename("mytable");
     components = new ArrayList<LockComponent>(1);
@@ -1090,7 +1100,7 @@ public class TestTxnHandler {
 
     // Locks not associated with a txn
     components = new ArrayList<LockComponent>(1);
-    comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.PARTITION, "yourdb");
+    comp = new LockComponent(LockType.SHARED_READ, LockLevel.PARTITION, "yourdb");
     comp.setTablename("yourtable");
     comp.setPartitionname("yourpartition");
     components.add(comp);
@@ -1104,14 +1114,13 @@ public class TestTxnHandler {
     for (int i = 0; i < saw.length; i++) saw[i] = false;
     for (ShowLocksResponseElement lock : locks) {
       if (lock.getLockid() == 1) {
-        assertEquals(1, lock.getTxnid());
+        assertEquals(0, lock.getTxnid());
         assertEquals("mydb", lock.getDbname());
         assertNull(lock.getTablename());
         assertNull(lock.getPartname());
         assertEquals(LockState.ACQUIRED, lock.getState());
         assertEquals(LockType.EXCLUSIVE, lock.getType());
-        assertTrue(lock.toString(), 0 == lock.getLastheartbeat() &&
-            lock.getTxnid() != 0);
+        assertTrue(lock.toString(), 0 != lock.getLastheartbeat());
         assertTrue("Expected acquired at " + lock.getAcquiredat() + " to be between " + begining
             + " and " + System.currentTimeMillis(),
             begining <= lock.getAcquiredat() && System.currentTimeMillis() >= lock.getAcquiredat());
@@ -1119,7 +1128,7 @@ public class TestTxnHandler {
         assertEquals("localhost", lock.getHostname());
         saw[0] = true;
       } else if (lock.getLockid() == 2) {
-        assertEquals(2, lock.getTxnid());
+        assertEquals(1, lock.getTxnid());
         assertEquals("mydb", lock.getDbname());
         assertEquals("mytable", lock.getTablename());
         assertNull(lock.getPartname());
@@ -1137,7 +1146,7 @@ public class TestTxnHandler {
         assertEquals("yourtable", lock.getTablename());
         assertEquals("yourpartition", lock.getPartname());
         assertEquals(LockState.ACQUIRED, lock.getState());
-        assertEquals(LockType.SHARED_WRITE, lock.getType());
+        assertEquals(LockType.SHARED_READ, lock.getType());
         assertTrue(lock.toString(), begining <= lock.getLastheartbeat() &&
             System.currentTimeMillis() >= lock.getLastheartbeat());
         assertTrue(begining <= lock.getAcquiredat() &&

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 1de3309..52dadb7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -377,7 +377,7 @@ public enum ErrorMsg {
       "instantiated, check hive.txn.manager"),
   TXN_NO_SUCH_TRANSACTION(10262, "No record of transaction {0} could be found, " +
       "may have timed out", true),
-  TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}.", true),
+  TXN_ABORTED(10263, "Transaction manager has aborted the transaction {0}.  Reason: {1}", true),
   DBTXNMGR_REQUIRES_CONCURRENCY(10264,
       "To use DbTxnManager you must set hive.support.concurrency=true"),
   TXNMGR_NOT_ACID(10265, "This command is not allowed on an ACID table {0}.{1} with a non-ACID transaction manager", true),

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index 7fa57d6..18ed864 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -172,8 +172,9 @@ public class DbLockManager implements HiveLockManager{
       LOG.error("Metastore could not find " + JavaUtils.txnIdToString(lock.getTxnid()));
       throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(lock.getTxnid()));
     } catch (TxnAbortedException e) {
-      LOG.error("Transaction " + JavaUtils.txnIdToString(lock.getTxnid()) + " already aborted.");
-      throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(lock.getTxnid()));
+      LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(lock.getTxnid()), e.getMessage());
+      LOG.error(le.getMessage());
+      throw le;
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 3aec8eb..9c2a346 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -107,6 +107,8 @@ public class DbTxnManager extends HiveTxnManagerImpl {
 
   @Override
   public long openTxn(String user) throws LockException {
+    //todo: why don't we lock the snapshot here???  Instead of having client make an explicit call
+    //whenever it chooses
     init();
     if(isTxnOpen()) {
       throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
@@ -132,8 +134,17 @@ public class DbTxnManager extends HiveTxnManagerImpl {
 
   @Override
   public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
-    acquireLocks(plan, ctx, username, true);
-    startHeartbeat();
+    try {
+      acquireLocks(plan, ctx, username, true);
+      startHeartbeat();
+    }
+    catch(LockException e) {
+      if(e.getCause() instanceof TxnAbortedException) {
+        txnId = 0;
+        statementId = -1;
+      }
+      throw e;
+    }
   }
 
   /**
@@ -157,7 +168,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     // For each source to read, get a shared lock
     for (ReadEntity input : plan.getInputs()) {
       if (!input.needsLock() || input.isUpdateOrDelete()) {
-        // We don't want to acquire readlocks during update or delete as we'll be acquiring write
+        // We don't want to acquire read locks during update or delete as we'll be acquiring write
         // locks instead.
         continue;
       }
@@ -320,8 +331,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       LOG.error("Metastore could not find " + JavaUtils.txnIdToString(txnId));
       throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
     } catch (TxnAbortedException e) {
-      LOG.error("Transaction " + JavaUtils.txnIdToString(txnId) + " aborted");
-      throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
+      LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId), e.getMessage());
+      LOG.error(le.getMessage());
+      throw le;
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);
@@ -389,8 +401,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
         LOG.error("Unable to find transaction " + JavaUtils.txnIdToString(txnId));
         throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(txnId));
       } catch (TxnAbortedException e) {
-        LOG.error("Transaction aborted " + JavaUtils.txnIdToString(txnId));
-        throw new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId));
+        LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(txnId), e.getMessage());
+        LOG.error(le.getMessage());
+        throw le;
       } catch (TException e) {
         throw new LockException(
             ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg() + "(" + JavaUtils.txnIdToString(txnId)

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
new file mode 100644
index 0000000..9085a6a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidWriteSetService.java
@@ -0,0 +1,61 @@
+package org.apache.hadoop.hive.ql.txn;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.HouseKeeperServiceBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Periodically cleans WriteSet tracking information used in Transaction management
+ */
+public class AcidWriteSetService extends HouseKeeperServiceBase {
+  private static final Logger LOG = LoggerFactory.getLogger(AcidWriteSetService.class);
+  @Override
+  protected long getStartDelayMs() {
+    return 0;
+  }
+  @Override
+  protected long getIntervalMs() {
+    return hiveConf.getTimeVar(HiveConf.ConfVars.WRITE_SET_REAPER_INTERVAL, TimeUnit.MILLISECONDS);
+  }
+  @Override
+  protected Runnable getScheduedAction(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+    return new WriteSetReaper(hiveConf, isAliveCounter);
+  }
+  @Override
+  public String getServiceDescription() {
+    return "Periodically cleans obsolete WriteSet tracking information used in Transaction management";
+  }
+  private static final class WriteSetReaper implements Runnable {
+    private final TxnStore txnHandler;
+    private final AtomicInteger isAliveCounter;
+    private WriteSetReaper(HiveConf hiveConf, AtomicInteger isAliveCounter) {
+      txnHandler = TxnUtils.getTxnStore(hiveConf);
+      this.isAliveCounter = isAliveCounter;
+    }
+    @Override
+    public void run() {
+      TxnStore.MutexAPI.LockHandle handle = null;
+      try {
+        handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.WriteSetCleaner.name());
+        long startTime = System.currentTimeMillis();
+        txnHandler.performWriteSetGC();
+        int count = isAliveCounter.incrementAndGet();
+        LOG.info("cleaner ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds.  isAliveCounter=" + count);
+      }
+      catch(Throwable t) {
+        LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+      }
+      finally {
+        if(handle != null) {
+          handle.releaseLocks();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
index 947f17c..caab10d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/HouseKeeperServiceBase.java
@@ -81,7 +81,7 @@ public abstract class HouseKeeperServiceBase implements HouseKeeperService {
    */
   protected abstract long getStartDelayMs();
   /**
-   * Determines how fequently the service is running its task.
+   * Determines how frequently the service is running its task.
    */
   protected abstract long getIntervalMs();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index abbe5d4..949cbd5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -147,7 +147,7 @@ public class Initiator extends CompactorThread {
               if (compactionNeeded != null) requestCompaction(ci, runAs, compactionNeeded);
             } catch (Throwable t) {
               LOG.error("Caught exception while trying to determine if we should compact " +
-                  ci + ".  Marking clean to avoid repeated failures, " +
+                  ci + ".  Marking failed to avoid repeated failures, " +
                   "" + StringUtils.stringifyException(t));
               txnHandler.markFailed(ci);
             }

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 6238e2b..767c10c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -182,7 +182,7 @@ public class Worker extends CompactorThread {
           txnHandler.markCompacted(ci);
         } catch (Exception e) {
           LOG.error("Caught exception while trying to compact " + ci +
-              ".  Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e));
+              ".  Marking failed to avoid repeated failures, " + StringUtils.stringifyException(e));
           txnHandler.markFailed(ci);
         }
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 1030987..472da0b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -669,7 +669,7 @@ public class TestTxnCommands2 {
     t.run();
   }
 
-  private static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
+  public static void runHouseKeeperService(HouseKeeperService houseKeeperService, HiveConf conf) throws Exception {
     int lastCount = houseKeeperService.getIsAliveCounter();
     houseKeeperService.start(conf);
     int maxIter = 10;

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index f87dd14..83a2ba3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -65,6 +65,26 @@ public class TestAcidUtils {
     assertEquals("/tmp/delta_0000100_0000200_0007/bucket_00023",
       AcidUtils.createFilename(p, options).toString());
   }
+  @Test
+  public void testCreateFilenameLargeIds() throws Exception {
+    Path p = new Path("/tmp");
+    Configuration conf = new Configuration();
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
+      .setOldStyle(true).bucket(123456789);
+    assertEquals("/tmp/123456789_0",
+      AcidUtils.createFilename(p, options).toString());
+    options.bucket(23)
+      .minimumTransactionId(1234567880)
+      .maximumTransactionId(1234567890)
+      .writingBase(true)
+      .setOldStyle(false);
+    assertEquals("/tmp/base_1234567890/bucket_00023",
+      AcidUtils.createFilename(p, options).toString());
+    options.writingBase(false);
+    assertEquals("/tmp/delta_1234567880_1234567890_0000/bucket_00023",
+      AcidUtils.createFilename(p, options).toString());
+  }
+  
 
   @Test
   public void testParsing() throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/10d05491/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 3a6e76e..22f7482 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.Context;
@@ -500,6 +501,12 @@ public class TestDbTxnManager {
       partCols.add(fs);
       t.setPartCols(partCols);
     }
+    Map<String, String> tblProps = t.getParameters();
+    if(tblProps == null) {
+      tblProps = new HashMap<>();
+    }
+    tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+    t.setParameters(tblProps);
     return t;
   }
 


[05/13] hive git commit: HIVE-13619: Bucket map join plan is incorrect (Vikram Dixit K, reviewed by Gunther Hagleitner)

Posted by jd...@apache.org.
HIVE-13619: Bucket map join plan is incorrect (Vikram Dixit K, reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4eb96030
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4eb96030
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4eb96030

Branch: refs/heads/llap
Commit: 4eb960305f6cf30aa6e1011ee09388b1ab4c4fd9
Parents: da82819
Author: vikram <vi...@hortonworks.com>
Authored: Thu May 5 14:35:58 2016 -0700
Committer: vikram <vi...@hortonworks.com>
Committed: Thu May 5 14:35:58 2016 -0700

----------------------------------------------------------------------
 ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4eb96030/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
index 41507b1..a8ed74c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
@@ -83,7 +83,7 @@ public class OperatorUtils {
 
   public static <T> T findSingleOperatorUpstreamJoinAccounted(Operator<?> start, Class<T> clazz) {
     Set<T> found = findOperatorsUpstreamJoinAccounted(start, clazz, new HashSet<T>());
-    return found.size() == 1 ? found.iterator().next(): null;
+    return found.size() >= 1 ? found.iterator().next(): null;
   }
 
   public static <T> Set<T> findOperatorsUpstream(Collection<Operator<?>> starts, Class<T> clazz) {