You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/11/12 20:20:19 UTC

[1/3] hive git commit: HIVE-14943 Base Implementation (of HIVE-10924) (Eugene Koifman, reviewed by Alan Gates)

Repository: hive
Updated Branches:
  refs/heads/master 52ba014fc -> e00f909dd


http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 4e1122d..637a01a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -45,6 +46,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -99,6 +101,7 @@ public class TestDbTxnManager2 {
   }
   @Test
   public void testLocksInSubquery() throws Exception {
+    dropTable(new String[] {"T","S", "R"});
     checkCmdOnDriver(driver.run("create table if not exists T (a int, b int)"));
     checkCmdOnDriver(driver.run("create table if not exists S (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
     checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
@@ -132,6 +135,7 @@ public class TestDbTxnManager2 {
   }
   @Test
   public void createTable() throws Exception {
+    dropTable(new String[] {"T"});
     CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)");
     checkCmdOnDriver(cpr);
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
@@ -143,6 +147,7 @@ public class TestDbTxnManager2 {
   }
   @Test
   public void insertOverwriteCreate() throws Exception {
+    dropTable(new String[] {"T2", "T3"});
     CommandProcessorResponse cpr = driver.run("create table if not exists T2(a int)");
     checkCmdOnDriver(cpr);
     cpr = driver.run("create table if not exists T3(a int)");
@@ -163,6 +168,7 @@ public class TestDbTxnManager2 {
   }
   @Test
   public void insertOverwritePartitionedCreate() throws Exception {
+    dropTable(new String[] {"T4"});
     CommandProcessorResponse cpr = driver.run("create table if not exists T4 (name string, gpa double) partitioned by (age int)");
     checkCmdOnDriver(cpr);
     cpr = driver.run("create table if not exists T5(name string, age int, gpa double)");
@@ -183,6 +189,7 @@ public class TestDbTxnManager2 {
   }
   @Test
   public void basicBlocking() throws Exception {
+    dropTable(new String[] {"T6"});
     CommandProcessorResponse cpr = driver.run("create table if not exists T6(a int)");
     checkCmdOnDriver(cpr);
     cpr = driver.compileAndRespond("select a from T6");
@@ -213,6 +220,7 @@ public class TestDbTxnManager2 {
   }
   @Test
   public void lockConflictDbTable() throws Exception {
+    dropTable(new String[] {"temp.T7"});
     CommandProcessorResponse cpr = driver.run("create database if not exists temp");
     checkCmdOnDriver(cpr);
     cpr = driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
@@ -240,6 +248,7 @@ public class TestDbTxnManager2 {
   }
   @Test
   public void updateSelectUpdate() throws Exception {
+    dropTable(new String[] {"T8"});
     CommandProcessorResponse cpr = driver.run("create table T8(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
     cpr = driver.compileAndRespond("delete from T8 where b = 89");
@@ -273,6 +282,7 @@ public class TestDbTxnManager2 {
 
   @Test
   public void testLockRetryLimit() throws Exception {
+    dropTable(new String[] {"T9"});
     conf.setIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES, 2);
     conf.setBoolVar(HiveConf.ConfVars.TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT, true);
     HiveTxnManager otherTxnMgr = new DbTxnManager(); 
@@ -309,6 +319,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testLockBlockedBy() throws Exception {
+    dropTable(new String[] {"TAB_BLOCKED"});
     CommandProcessorResponse cpr = driver.run("create table TAB_BLOCKED (a int, b int) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
     cpr = driver.compileAndRespond("select * from TAB_BLOCKED");
@@ -330,6 +341,7 @@ public class TestDbTxnManager2 {
 
   @Test
   public void testDummyTxnManagerOnAcidTable() throws Exception {
+    dropTable(new String[] {"T10", "T11"});
     // Create an ACID table with DbTxnManager
     CommandProcessorResponse cpr = driver.run("create table T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
@@ -389,6 +401,8 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testMetastoreTablesCleanup() throws Exception {
+    dropTable(new String[] {"temp.T10", "temp.T11", "temp.T12p", "temp.T13p"});
+
     CommandProcessorResponse cpr = driver.run("create database if not exists temp");
     checkCmdOnDriver(cpr);
 
@@ -569,6 +583,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void checkExpectedLocks() throws Exception {
+    dropTable(new String[] {"acidPart", "nonAcidPart"});
     CommandProcessorResponse cpr = null;
     cpr = driver.run("create table acidPart(a int, b int) partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
@@ -640,8 +655,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void checkExpectedLocks2() throws Exception {
-    checkCmdOnDriver(driver.run("drop table if exists tab_acid"));
-    checkCmdOnDriver(driver.run("drop table if exists tab_not_acid"));
+    dropTable(new String[] {"tab_acid", "tab_not_acid"});
     checkCmdOnDriver(driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " +
       "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
     checkCmdOnDriver(driver.run("create table if not exists tab_not_acid (na int, nb int) partitioned by (np string) " +
@@ -676,7 +690,7 @@ public class TestDbTxnManager2 {
   }
 
   /** The list is small, and the object is generated, so we don't use sets/equals/etc. */
-  public static void checkLock(LockType expectedType, LockState expectedState, String expectedDb,
+  public static ShowLocksResponseElement checkLock(LockType expectedType, LockState expectedState, String expectedDb,
       String expectedTable, String expectedPartition, List<ShowLocksResponseElement> actuals) {
     for (ShowLocksResponseElement actual : actuals) {
       if (expectedType == actual.getType() && expectedState == actual.getState()
@@ -684,11 +698,12 @@ public class TestDbTxnManager2 {
           && StringUtils.equals(normalizeCase(expectedTable), normalizeCase(actual.getTablename()))
           && StringUtils.equals(
               normalizeCase(expectedPartition), normalizeCase(actual.getPartname()))) {
-        return;
+        return actual;
       }
     }
     Assert.fail("Could't find {" + expectedType + ", " + expectedState + ", " + expectedDb
        + ", " + expectedTable  + ", " + expectedPartition + "} in " + actuals);
+    throw new IllegalStateException("How did it get here?!");
   }
 
   @Test
@@ -878,6 +893,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testWriteSetTracking1() throws Exception {
+    dropTable(new String[] {"TAB_PART"});
     CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
       "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
@@ -893,11 +909,17 @@ public class TestDbTxnManager2 {
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas");
     txnMgr2.commitTxn();
   }
+  private void dropTable(String[] tabs) throws Exception {
+    for(String tab : tabs) {
+      driver.run("drop table if exists " + tab);
+    }
+  }
   /**
    * txns overlap in time but do not update same resource - no conflict
    */
   @Test
   public void testWriteSetTracking2() throws Exception {
+    dropTable(new String[] {"TAB_PART", "TAB2"});
     CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
       "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
@@ -919,33 +941,42 @@ public class TestDbTxnManager2 {
     txnMgr2.acquireLocks(driver.getPlan(), ctx, "Catherine");
     txnMgr2.commitTxn();
   }
-
   /**
    * txns overlap and update the same resource - can't commit 2nd txn
    */
   @Test
   public void testWriteSetTracking3() throws Exception {
+    dropTable(new String[] {"TAB_PART"});
     CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
       "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)"));
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
 
-    txnMgr.openTxn("Known");
-    txnMgr2.openTxn("Unknown");
+    long txnId = txnMgr.openTxn("Known");
+    long txnId2 = txnMgr2.openTxn("Unknown");
     checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr);
     Assert.assertEquals("Unexpected lock count", 1, locks.size());
-    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks);
     checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
     ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
     locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
     Assert.assertEquals("Unexpected lock count", 2, locks.size());
-    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks);
-    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", "p=blah", locks);
+    AddDynamicPartitions adp = new AddDynamicPartitions(txnId, "default", "TAB_PART",
+      Collections.singletonList("p=blah"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
     txnMgr.commitTxn();
+    
+    adp.setTxnid(txnId2);
+    txnHandler.addDynamicPartitions(adp);
     LockException expectedException = null;
     try {
+      //with HIVE-15032 this should use static parts and thus not need addDynamicPartitions
       txnMgr2.commitTxn();
     }
     catch (LockException e) {
@@ -953,8 +984,8 @@ public class TestDbTxnManager2 {
     }
     Assert.assertTrue("Didn't get exception", expectedException != null);
     Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
-    Assert.assertEquals("Exception msg didn't match", 
-      "Aborting [txnid:2,2] due to a write conflict on default/tab_part committed by [txnid:1,2]",
+    Assert.assertEquals("Exception msg didn't match",
+      "Aborting [txnid:3,3] due to a write conflict on default/TAB_PART/p=blah committed by [txnid:2,3] u/u",
       expectedException.getCause().getMessage());
   }
   /**
@@ -963,6 +994,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testWriteSetTracking4() throws Exception {
+    dropTable(new String[] {"TAB_PART", "TAB2"});
     Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
     CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
       "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
@@ -1040,26 +1072,33 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testWriteSetTracking5() throws Exception {
+    dropTable(new String[] {"TAB_PART"});
     Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
     CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
       "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
+    checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)"));
     HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
 
     txnMgr.openTxn("Known");
-    txnMgr2.openTxn("Unknown");
+    long txnId = txnMgr2.openTxn("Unknown");
     checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
     List<ShowLocksResponseElement> locks = getLocks(txnMgr);
     Assert.assertEquals("Unexpected lock count", 1, locks.size());
-    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks);
     checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
     ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
     locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
     Assert.assertEquals("Unexpected lock count", 2, locks.size());
-    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks);
-    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", "p=blah", locks);
     txnMgr.rollbackTxn();
+
+    AddDynamicPartitions adp = new AddDynamicPartitions(txnId, "default", "TAB_PART",
+      Arrays.asList("p=blah"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
     Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
     txnMgr2.commitTxn();//since conflicting txn rolled back, commit succeeds
     Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
@@ -1069,6 +1108,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testWriteSetTracking6() throws Exception {
+    dropTable(new String[] {"TAB2"});
     Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
     CommandProcessorResponse cpr = driver.run("create table if not exists TAB2(a int, b int) clustered " +
       "by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
@@ -1102,6 +1142,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testWriteSetTracking7() throws Exception {
+    dropTable(new String[] {"tab2", "TAB2"});
     Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
     CommandProcessorResponse cpr = driver.run("create table if not exists tab2 (a int, b int) " +
       "partitioned by (p string) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
@@ -1209,6 +1250,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testWriteSetTracking8() throws Exception {
+    dropTable(new String[] {"tab1", "TAB1"});
     CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
       "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
@@ -1262,6 +1304,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testWriteSetTracking9() throws Exception {
+    dropTable(new String[] {"TAB1"});
     CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
       "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
@@ -1321,6 +1364,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testWriteSetTracking10() throws Exception {
+    dropTable(new String[] {"TAB1"});
     CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
       "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
@@ -1369,7 +1413,7 @@ public class TestDbTxnManager2 {
     }
     Assert.assertNotEquals("Expected exception", null, exception);
     Assert.assertEquals("Exception msg doesn't match",
-      "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]",
+      "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3] d/u",
       exception.getCause().getMessage());
 
     Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
@@ -1382,6 +1426,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testWriteSetTracking11() throws Exception {
+    dropTable(new String[] {"TAB1"});
     CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
       "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
@@ -1442,6 +1487,7 @@ public class TestDbTxnManager2 {
   }
   @Test
   public void testCompletedTxnComponents() throws Exception {
+    dropTable(new String[] {"TAB1", "tab_not_acid2"});
     CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
       "clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     checkCmdOnDriver(cpr);
@@ -1462,6 +1508,7 @@ public class TestDbTxnManager2 {
    */
   @Test
   public void testMultiInsert() throws Exception {
+    dropTable(new String[] {"TAB1", "tab_not_acid"});
     checkCmdOnDriver(driver.run("drop table if exists tab1"));
     checkCmdOnDriver(driver.run("drop table if exists tab_not_acid"));
     CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
@@ -1518,4 +1565,568 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "XYZ", null, locks);
     Assert.assertEquals("Wrong AgentInfo", driver.getPlan().getQueryId(), locks.get(0).getAgentInfo());
   }
+  @Test
+  public void testMerge3Way01() throws Exception {
+    testMerge3Way(false);
+  }
+  @Test
+  public void testMerge3Way02() throws Exception {
+    testMerge3Way(true);
+  }
+
+  /**
+   * @param cc whether to cause a WW conflict or not
+   * @throws Exception
+   */
+  private void testMerge3Way(boolean cc) throws Exception {
+    dropTable(new String[] {"target","source", "source2"});
+    checkCmdOnDriver(driver.run("create table target (a int, b int) " +
+      "partitioned by (p int, q int) clustered by (a) into 2  buckets " +
+      "stored as orc TBLPROPERTIES ('transactional'='true')"));
+    //in practice we don't really care about the data in any of these tables (except as far as
+    //it creates partitions, the SQL being test is not actually executed and results of the
+    //wrt ACID metadata is supplied manually via addDynamicPartitions().  But having data makes
+    //it easier to follow the intent
+    checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)"));
+    checkCmdOnDriver(driver.run("create table source (a int, b int, p int, q int)"));
+    checkCmdOnDriver(driver.run("insert into source values " +
+      // I-(1/2)            D-(1/2)    I-(1/3)     U-(1/3)     D-(2/2)     I-(1/1) - new part
+      "(9,10,1,2),        (3,4,1,2), (11,12,1,3), (5,13,1,3), (7,8,2,2), (14,15,1,1)"));
+    checkCmdOnDriver(driver.run("create table source2 (a int, b int, p int, q int)"));
+    checkCmdOnDriver(driver.run("insert into source2 values " +
+  //cc ? -:U-(1/2)     D-(1/2)         cc ? U-(1/3):-             D-(2/2)       I-(1/1) - new part 2
+      "(9,100,1,2),      (3,4,1,2),               (5,13,1,3),       (7,8,2,2), (14,15,2,1)"));
+    
+
+    long txnId1 = txnMgr.openTxn("T1");
+    checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " +
+      "when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3
+      "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
+      "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
+    Assert.assertEquals("Unexpected lock count", 5, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks);
+
+    //start concurrent txn
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    long txnId2 = txnMgr2.openTxn("T2");
+    checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " +
+      "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2
+      "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
+      "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false);
+    locks = getLocks(txnMgr2, true);
+    Assert.assertEquals("Unexpected lock count", 10, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks);
+
+    long extLockId = checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "target", null, locks).getLockid();
+    checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source2", null, locks);
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=2", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=3", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=2/q=2", locks);
+
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      0,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1));
+    //complete 1st txn
+    AddDynamicPartitions adp = new AddDynamicPartitions(txnId1, "default", "target",
+      Collections.singletonList("p=1/q=3"));//update clause
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
+    adp = new AddDynamicPartitions(txnId1, "default", "target",
+      Arrays.asList("p=1/q=2","p=2/q=2"));//delete clause
+    adp.setOperationType(DataOperationType.DELETE);
+    txnHandler.addDynamicPartitions(adp);
+    adp = new AddDynamicPartitions(txnId1, "default", "target",
+      Arrays.asList("p=1/q=2","p=1/q=3","p=1/q=1"));//insert clause
+    adp.setOperationType(DataOperationType.INSERT);
+    txnHandler.addDynamicPartitions(adp);
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      1,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 +
+        " and tc_operation_type='u'"));
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      2,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 +
+        " and tc_operation_type='d'"));
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      3,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 +
+        " and tc_operation_type='i'"));
+    txnMgr.commitTxn();//commit T1
+    Assert.assertEquals(
+      "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+        TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+      6,
+      TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId1));
+    Assert.assertEquals(
+      "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+        TxnDbUtil.queryToString("select * from WRITE_SET"),
+      1,
+      TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId1 +
+        " and ws_operation_type='u'"));
+    Assert.assertEquals(
+      "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+        TxnDbUtil.queryToString("select * from WRITE_SET"),
+      2,
+      TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId1 +
+        " and ws_operation_type='d'"));
+
+    //re-check locks which were in Waiting state - should now be Acquired
+    ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId);
+    locks = getLocks(txnMgr2, true);
+    Assert.assertEquals("Unexpected lock count", 5, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source2", null, locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks);
+
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      0,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2));
+    //complete 2nd txn
+    adp = new AddDynamicPartitions(txnId2, "default", "target",
+      Collections.singletonList(cc ? "p=1/q=3" : "p=1/p=2"));//update clause
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
+    adp = new AddDynamicPartitions(txnId2, "default", "target",
+      Arrays.asList("p=1/q=2","p=2/q=2"));//delete clause
+    adp.setOperationType(DataOperationType.DELETE);
+    txnHandler.addDynamicPartitions(adp);
+    adp = new AddDynamicPartitions(txnId2, "default", "target",
+      Arrays.asList("p=1/q=2","p=1/q=3","p=1/q=1"));//insert clause
+    adp.setOperationType(DataOperationType.INSERT);
+    txnHandler.addDynamicPartitions(adp);
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      1,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 +
+        " and tc_operation_type='u'"));
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      2,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 +
+        " and tc_operation_type='d'"));
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      3,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 +
+        " and tc_operation_type='i'"));
+
+    LockException expectedException = null;
+    try {
+      txnMgr2.commitTxn();//commit T2
+    }
+    catch (LockException e) {
+      expectedException = e;
+    }
+    if(cc) {
+      Assert.assertNotNull("didn't get exception", expectedException);
+      Assert.assertEquals("Transaction manager has aborted the transaction txnid:3.  Reason: " +
+        "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=3 " +
+        "committed by [txnid:2,3] u/u", expectedException.getMessage());
+      Assert.assertEquals(
+        "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+          TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+        0,
+        TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2));
+      Assert.assertEquals(
+        "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+          TxnDbUtil.queryToString("select * from WRITE_SET"),
+        0,
+        TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId2));
+    }
+    else {
+      Assert.assertNull("Unexpected exception " + expectedException, expectedException);
+      Assert.assertEquals(
+        "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+          TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+        6,
+        TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2));
+      Assert.assertEquals(
+        "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+          TxnDbUtil.queryToString("select * from WRITE_SET"),
+        1,
+        TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId2 +
+          " and ws_operation_type='u'"));
+      Assert.assertEquals(
+        "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+          TxnDbUtil.queryToString("select * from WRITE_SET"),
+        2,
+        TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId2 +
+          " and ws_operation_type='d'"));
+    }
+
+
+  }
+  @Test 
+  public void testMergeUnpartitioned01() throws Exception {
+    testMergeUnpartitioned(true);
+  }
+  @Test
+  public void testMergeUnpartitioned02() throws Exception {
+    testMergeUnpartitioned(false);
+  }
+
+  /**
+   * run a merge statement using un-partitioned target table and a concurrent op on the target
+   * Check that proper locks are acquired and Write conflict detection works and the state
+   * of internal table.
+   * @param causeConflict true to make 2 operations such that they update the same entity
+   * @throws Exception
+   */
+  private void testMergeUnpartitioned(boolean causeConflict) throws Exception {
+    dropTable(new String[] {"target","source"});
+    checkCmdOnDriver(driver.run("create table target (a int, b int) " +
+      "clustered by (a) into 2  buckets " +
+      "stored as orc TBLPROPERTIES ('transactional'='true')"));
+    checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)"));
+    checkCmdOnDriver(driver.run("create table source (a int, b int)"));
+    
+    long txnid1 = txnMgr.openTxn("T1");
+    if(causeConflict) {
+      checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1"));
+    }
+    else {
+      checkCmdOnDriver(driver.compileAndRespond("insert into target values(9,10),(11,12)"));
+    }
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      1,//no DP, so it's populated from lock info
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid1));
+
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    checkLock(causeConflict ? LockType.SHARED_WRITE : LockType.SHARED_READ,
+      LockState.ACQUIRED, "default", "target", null, locks);
+
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    //start a 2nd (overlapping) txn
+    long txnid2 = txnMgr2.openTxn("T2");
+    checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " +
+      "on t.a=s.a " +
+      "when matched then delete " +
+      "when not matched then insert values(s.a,s.b)"));
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false);
+    locks = getLocks(txnMgr, true);
+
+    Assert.assertEquals("Unexpected lock count", 3, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks);
+    checkLock(LockType.SHARED_READ, causeConflict ? LockState.WAITING : LockState.ACQUIRED,
+      "default", "source", null, locks);
+    long extLockId = checkLock(LockType.SHARED_WRITE, causeConflict ? LockState.WAITING : LockState.ACQUIRED,
+      "default", "target", null, locks).getLockid();
+
+    txnMgr.commitTxn();//commit T1
+    
+    Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " +
+        TxnDbUtil.queryToString("select * from WRITE_SET"),
+      causeConflict ? 1 : 0,//Inserts are not tracked by WRITE_SET
+      TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid1 +
+        " and ws_operation_type=" + (causeConflict ? "'u'" : "'i'")));
+
+
+    //re-check locks which were in Waiting state - should now be Acquired
+    ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId);
+    locks = getLocks(txnMgr, true);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks);
+
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      1,//
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2));
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      1,//
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 +
+      "and tc_operation_type='d'"));
+    
+    //complete T2 txn
+    LockException expectedException = null;
+    try {
+      txnMgr2.commitTxn();
+    }
+    catch (LockException e) {
+      expectedException = e;
+    }
+    if(causeConflict) {
+      Assert.assertTrue("Didn't get exception", expectedException != null);
+      Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
+      Assert.assertEquals("Exception msg didn't match",
+        "Aborting [txnid:3,3] due to a write conflict on default/target committed by [txnid:2,3] d/u",
+        expectedException.getCause().getMessage());
+    }
+    else {
+      Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " +
+          TxnDbUtil.queryToString("select * from WRITE_SET"),
+        1,//Unpartitioned table: 1 row for Delete; Inserts are not tracked in WRITE_SET
+        TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2 +
+          " and ws_operation_type='d'"));
+    }
+  }
+  /**
+   * Check that DP with partial spec properly updates TXN_COMPONENTS
+   * @throws Exception
+   */
+  @Test
+  public void testDynamicPartitionInsert() throws Exception {
+    dropTable(new String[] {"target"});
+    checkCmdOnDriver(driver.run("create table target (a int, b int) " +
+      "partitioned by (p int, q int) clustered by (a) into 2  buckets " +
+      "stored as orc TBLPROPERTIES ('transactional'='true')"));
+    long txnid1 = txnMgr.openTxn("T1");
+    checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)"));
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    //table is empty, so can only lock the table
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+    Assert.assertEquals(
+      "HIVE_LOCKS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " +
+        TxnDbUtil.queryToString("select * from HIVE_LOCKS"),
+      1,
+      TxnDbUtil.countQueryAgent("select count(*) from HIVE_LOCKS where hl_txnid=" + txnid1));
+    txnMgr.rollbackTxn();
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      0,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid1));
+    //now actually write to table to generate some partitions
+    checkCmdOnDriver(driver.run("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)"));
+    driver.run("select count(*) from target");
+    List<String> r = new ArrayList<>();
+    driver.getResults(r);
+    Assert.assertEquals("", "4", r.get(0));
+    Assert.assertEquals(//look in COMPLETED_TXN_COMPONENTS because driver.run() committed!!!!
+      "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1 + 1) + "): " +
+        TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+      2,//2 distinct partitions created
+      //txnid+1 because we want txn used by previous driver.run("insert....)
+      TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (txnid1 + 1)));
+
+
+    long txnid2 = txnMgr.openTxn("T1");
+    checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)"));
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+    locks = getLocks(txnMgr, true);
+    Assert.assertEquals("Unexpected lock count", 1, locks.size());
+    //Plan is using DummyPartition, so can only lock the table... unfortunately
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+    AddDynamicPartitions adp = new AddDynamicPartitions(txnid2, "default", "target", Arrays.asList("p=1/q=2","p=1/q=2"));
+    adp.setOperationType(DataOperationType.INSERT);
+    txnHandler.addDynamicPartitions(adp);
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      2,//2 distinct partitions modified
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2));
+    txnMgr.commitTxn();
+  }
+  @Test
+  public void testMergePartitioned01() throws Exception {
+    testMergePartitioned(false);
+  }
+  @Test
+  public void testMergePartitioned02() throws Exception {
+    testMergePartitioned(true);
+  }
+  /**
+   * "run" an Update and Merge concurrently; Check that correct locks are acquired.
+   * Check state of auxiliary ACID tables.
+   * @param causeConflict - true to make the operations cause a Write conflict
+   * @throws Exception
+   */
+  private void testMergePartitioned(boolean causeConflict) throws Exception {
+    dropTable(new String[] {"target","source"});
+    checkCmdOnDriver(driver.run("create table target (a int, b int) " +
+      "partitioned by (p int, q int) clustered by (a) into 2  buckets " +
+      "stored as orc TBLPROPERTIES ('transactional'='true')"));
+    checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)"));
+    checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)"));
+
+    long txnId1 = txnMgr.openTxn("T1");
+    checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1"));
+    txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+    List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
+    Assert.assertEquals("Unexpected lock count", 2, locks.size());
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    //start a 2nd (overlapping) txn
+    long txnid2 = txnMgr2.openTxn("T2");
+    checkCmdOnDriver(driver.compileAndRespond("merge into target using source " +
+      "on target.p=source.p1 and target.a=source.a1 " +
+      "when matched then update set b = 11 " +
+      "when not matched then insert values(a1,b1,p1,q1)"));
+    txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false);
+    locks = getLocks(txnMgr, true);
+    Assert.assertEquals("Unexpected lock count", 7, locks.size());
+    /**
+     * W locks from T1 are still there, so all locks from T2 block.
+     * The Update part of Merge requests W locks for each existing partition in target.
+     * The Insert part doesn't know which partitions may be written to: thus R lock on target table.
+     * */
+    checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source", null, locks);
+    long extLockId = checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "target", null, locks).getLockid();
+
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=2", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+    
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=3", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+    
+    checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=2/q=2", locks);
+
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      0,//because it's using a DP write
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1));
+    //complete T1 transaction (simulate writing to 2 partitions)
+    AddDynamicPartitions adp = new AddDynamicPartitions(txnId1, "default", "target",
+      Arrays.asList("p=1/q=2","p=1/q=3"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      2,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 +
+        " and tc_operation_type='u'"));
+    txnMgr.commitTxn();//commit T1
+    Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+      TxnDbUtil.queryToString("select * from WRITE_SET"),
+      2,//2 partitions updated
+      TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId1 +
+      " and ws_operation_type='u'"));
+    
+
+    //re-check locks which were in Waiting state - should now be Acquired
+    ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId);
+    locks = getLocks(txnMgr, true);
+    Assert.assertEquals("Unexpected lock count", 5, locks.size());
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
+    checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+    checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks);
+
+
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      0,//because it's using a DP write
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2));
+    //complete T2 txn
+    //simulate Insert into 2 partitions
+    adp = new AddDynamicPartitions(txnid2, "default", "target",
+      Arrays.asList("p=1/q=2","p=1/q=3"));
+    adp.setOperationType(DataOperationType.INSERT);
+    txnHandler.addDynamicPartitions(adp);
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      2,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + " and tc_operation_type='i'"));
+    //simulate Update of 1 partitions; depending on causeConflict, choose one of the partitions
+    //which was modified by the T1 update stmt or choose a non-conflicting one
+    adp = new AddDynamicPartitions(txnid2, "default", "target",
+      Collections.singletonList(causeConflict ? "p=1/q=2" : "p=1/q=1"));
+    adp.setOperationType(DataOperationType.UPDATE);
+    txnHandler.addDynamicPartitions(adp);
+    Assert.assertEquals(
+      "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+        TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+      1,
+      TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + " and tc_operation_type='u'"));
+    
+    
+    LockException expectedException = null;
+    try {
+      txnMgr2.commitTxn();
+    }
+    catch (LockException e) {
+      expectedException = e;
+    }
+    if(causeConflict) {
+      Assert.assertTrue("Didn't get exception", expectedException != null);
+      Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
+      Assert.assertEquals("Exception msg didn't match",
+        "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=2 committed by [txnid:2,3] u/u",
+        expectedException.getCause().getMessage());
+    }
+    else {
+      Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+          TxnDbUtil.queryToString("select * from WRITE_SET"),
+        1,//1 partitions updated
+        TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2 +
+          " and ws_operation_type='u'"));
+      Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+          TxnDbUtil.queryToString("select * from WRITE_SET"),
+        1,//1 partitions updated (and no other entries)
+        TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2));
+    }
+  }
+  //https://issues.apache.org/jira/browse/HIVE-15048
+  @Test
+  @Ignore("for some reason this fails with NPE in setUp() when run as part of the suite, but not standalone..")
+  public void testUpdateWithSubquery() throws Exception {
+    dropTable(new String[] {"target", "source"});
+    checkCmdOnDriver(driver.run("create table target (a int, b int) " +
+      "partitioned by (p int, q int) clustered by (a) into 2  buckets " +
+      "stored as orc TBLPROPERTIES ('transactional'='true')"));
+    checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
+
+    checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)"));
+    
+    checkCmdOnDriver(driver.run(
+"update target set b = 1 where p in (select t.q1 from source t where t.a1=5)"));
+/**
+ * So the above query fails with invalid reference 'p' (in subquery)  (as as if u use t.p)
+ * But before it fails, here is inputs/outpus before/after UpdateDeleteSemanticAnalyzer
+* Before UDSA
+* inputs:  [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
+* outputs: [default@target]
+* 
+* after UDSA
+* inputs:  [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
+* outputs: [default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
+* 
+* So it looks like....
+*/
+    checkCmdOnDriver(driver.run(
+      "update source set b1 = 1 where p1 in (select t.q from target t where t.p=2)"));
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java
index 3d2e648..467de26 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java
@@ -32,7 +32,6 @@ import org.junit.Test;
  */
 public class TestIUD {
   private static HiveConf conf;
-
   private ParseDriver pd;
 
   @BeforeClass
@@ -47,6 +46,9 @@ public class TestIUD {
   }
 
   ASTNode parse(String query) throws ParseException {
+    return parse(query, pd, conf);
+  }
+  static ASTNode parse(String query, ParseDriver pd, HiveConf conf) throws ParseException {
     ASTNode nd = null;
     try {
       nd = pd.parse(query, new Context(conf));

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java
new file mode 100644
index 0000000..7481e1a
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java
@@ -0,0 +1,246 @@
+package org.apache.hadoop.hive.ql.parse;
+
+import org.antlr.runtime.tree.RewriteEmptyStreamException;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+/**
+ * Testing parsing for SQL Merge statement
+ */
+public class TestMergeStatement {
+  private static HiveConf conf;
+  private ParseDriver pd;
+
+  @BeforeClass
+  public static void initialize() {
+    conf = new HiveConf(SemanticAnalyzer.class);
+    SessionState.start(conf);
+  }
+
+  @Before
+  public void setup() throws SemanticException, IOException {
+    pd = new ParseDriver();
+  }
+
+  ASTNode parse(String query) throws ParseException {
+    return TestIUD.parse(query, pd, conf);
+  }
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void test() throws ParseException {
+    ASTNode ast = parse(//using target.a breaks this
+      "MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN UPDATE set a = source.b, c=d+1");
+    Assert.assertEquals(
+      "(tok_merge " +
+        "(tok_tabref (tok_tabname target)) " +
+        "(tok_tabref (tok_tabname source)) " +
+        "(= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+        "(tok_matched " +
+          "(tok_update " +
+            "(tok_set_columns_clause " +
+              "(= (tok_table_or_col a) (. (tok_table_or_col source) b)) " +
+              "(= (tok_table_or_col c) (+ (tok_table_or_col d) 1))" +
+            ")" +
+          ")" +
+        ")" +
+      ")", ast.toStringTree());
+  }
+  @Test
+  public void test1() throws ParseException {
+    //testing MATCHED AND with CASE statement
+    ASTNode ast = parse(//using target.a breaks this
+      "MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED " +
+        "AND source.c2 < current_time() " +
+        "THEN UPDATE set a = source.b, b = case when c1 is null then c1 else c1 end");
+    Assert.assertEquals(
+      "(tok_merge " +
+        "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+          "(tok_matched " +
+            "(tok_update " +
+              "(tok_set_columns_clause " +
+                "(= (tok_table_or_col a) (. (tok_table_or_col source) b)) " +
+                "(= (tok_table_or_col b) (tok_function when (tok_function tok_isnull (tok_table_or_col c1)) (tok_table_or_col c1) (tok_table_or_col c1)))" +
+              ")" +
+            ") " +
+          "(< (. (tok_table_or_col source) c2) (tok_function current_time)))" +
+        ")", ast.toStringTree());
+  }
+  @Test
+  public void test2() throws ParseException {
+    ASTNode
+      ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN DELETE");
+    Assert.assertEquals(
+      "(tok_merge " +
+        "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+        "(tok_matched " +
+        "tok_delete)" +
+        ")", ast.toStringTree());
+  }
+  @Test
+  public void test3() throws ParseException {
+    ASTNode
+      ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED AND target.a + source.b > 8 THEN DELETE");
+    Assert.assertEquals(
+      "(tok_merge " +
+        "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+        "(tok_matched " +
+        "tok_delete " +
+        "(> (+ (. (tok_table_or_col target) a) (. (tok_table_or_col source) b)) 8))" +
+        ")", ast.toStringTree());
+  }
+  @Test
+  public void test4() throws ParseException {
+    ASTNode
+      ast = parse(
+      "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(source.a, case when source.b is null then target.b else source.b end)");
+    Assert.assertEquals(
+      "(tok_merge " +
+        "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+        "(tok_not_matched " +
+          "(tok_insert " +
+            "(tok_value_row " +
+              "(. (tok_table_or_col source) a) " +
+                "(tok_function when " +
+                  "(tok_function tok_isnull (. (tok_table_or_col source) b)) (. (tok_table_or_col target) b) " +
+                  "(. (tok_table_or_col source) b)" +
+                ")" +
+              ")" +
+          ")" +
+        ")" +
+      ")", ast.toStringTree());
+
+  }
+  /**
+   * both UPDATE and INSERT
+   * @throws ParseException
+   */
+  @Test
+  public void test5() throws ParseException {
+    ASTNode
+      ast = parse(
+      "MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN UPDATE set a = source.b, c=d+1 WHEN NOT MATCHED THEN INSERT VALUES(source.a, 2, current_date())");
+    Assert.assertEquals(
+      "(tok_merge " +
+        "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+        "(tok_matched " +
+          "(tok_update " +
+            "(tok_set_columns_clause (= (tok_table_or_col a) (. (tok_table_or_col source) b)) (= (tok_table_or_col c) (+ (tok_table_or_col d) 1)))" +
+          ")" +
+        ") " +
+        "(tok_not_matched " +
+          "(tok_insert " +
+            "(tok_value_row " +
+              "(. (tok_table_or_col source) a) " +
+              "2 " +
+              "(tok_function current_date)" +
+            ")" +
+          ")" +
+        ")" + 
+      ")", ast.toStringTree());
+
+  }
+  @Test
+  public void testNegative() throws ParseException {
+    expectedException.expect(ParseException.class);
+    expectedException.expectMessage("line 1:74 cannot recognize input near 'INSERT' '<EOF>' '<EOF>' in WHEN MATCHED THEN clause");
+    ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN INSERT");
+  }
+  @Test
+  public void testNegative1() throws ParseException {
+    expectedException.expect(ParseException.class);
+    expectedException.expectMessage("line 1:78 mismatched input 'DELETE' expecting INSERT near 'THEN' in WHEN NOT MATCHED clause");
+    ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN DELETE");
+  }
+  @Test
+  public void test8() throws ParseException {
+    ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED AND a = 1 THEN UPDATE set a = b WHEN MATCHED THEN DELETE");
+  }
+  @Test
+  public void test9() throws ParseException {
+    ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk " +
+      "WHEN MATCHED AND a = 1 THEN UPDATE set a = b " +
+      "WHEN MATCHED THEN DELETE " +
+      "WHEN NOT MATCHED AND d < e THEN INSERT VALUES(1,2)");
+  }
+  @Test
+  public void test10() throws ParseException {
+    ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk " +
+      "WHEN MATCHED AND a = 1 THEN DELETE " +
+      "WHEN MATCHED THEN UPDATE set a = b " +
+      "WHEN NOT MATCHED AND d < e THEN INSERT VALUES(1,2)");
+  }
+  /**
+   * we always expect 0 or 1 update/delete WHEN clause and 0 or 1 insert WHEN clause (and 1 or 2 WHEN clauses altogether)
+   * @throws ParseException
+   */
+  @Test
+  public void testNegative3() throws ParseException {
+    expectedException.expect(ParseException.class);
+    expectedException.expectMessage("line 1:119 cannot recognize input near 'INSERT' 'VALUES' '(' in WHEN MATCHED THEN clause");
+    ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED AND a = 1 THEN UPDATE set a = b WHEN MATCHED THEN INSERT VALUES(1,2)");
+  }
+  /**
+   * here we reverse the order of WHEN MATCHED/WHEN NOT MATCHED - should we allow it?
+   * @throws ParseException
+   */
+  @Test
+  public void testNegative4() throws ParseException {
+    expectedException.expect(ParseException.class);
+    expectedException.expectMessage("line 1:104 missing EOF at 'WHEN' near ')'");
+    ASTNode ast = parse(
+      "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(a,source.b) WHEN MATCHED THEN DELETE");
+  }
+
+  /**
+   * why does this fail but next one passes
+   * @throws ParseException
+   */
+  @Test
+  public void testNegative5() throws ParseException {
+    expectedException.expect(ParseException.class);
+    expectedException.expectMessage("line 1:103 mismatched input '+' expecting ) near 'b' in value row constructor");
+    //todo: why does this fail but next one passes?
+    ASTNode ast = parse(
+      "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(a,source.b + 1)");
+  }
+  @Test
+  public void test6() throws ParseException {
+    ASTNode ast = parse(
+      "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(a,(source.b + 1))");
+  }
+  @Test
+  public void testNegative6() throws ParseException {
+    expectedException.expect(RewriteEmptyStreamException.class);
+    expectedException.expectMessage("rule whenClauses");
+    ASTNode ast = parse(
+      "MERGE INTO target USING source ON target.pk = source.pk");
+  }
+  @Test
+  public void test7() throws ParseException {
+    ASTNode ast = parse("merge into acidTbl" + 
+      " using nonAcidPart2 source ON acidTbl.a = source.a2 " +
+      "WHEN MATCHED THEN UPDATE set b = source.b2 " +
+      "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)");
+    Assert.assertEquals(ast.toStringTree(), 
+      "(tok_merge " +
+        "(tok_tabref (tok_tabname acidtbl)) (tok_tabref (tok_tabname nonacidpart2) source) " +
+        "(= (. (tok_table_or_col acidtbl) a) (. (tok_table_or_col source) a2)) " +
+        "(tok_matched " +
+          "(tok_update " +
+            "(tok_set_columns_clause (= (tok_table_or_col b) (. (tok_table_or_col source) b2))))) " +
+        "(tok_not_matched " +
+          "(tok_insert " +
+            "(tok_value_row (. (tok_table_or_col source) a2) (. (tok_table_or_col source) b2)))))");
+  }
+}


[3/3] hive git commit: HIVE-14943 Base Implementation (of HIVE-10924) (Eugene Koifman, reviewed by Alan Gates)

Posted by ek...@apache.org.
HIVE-14943 Base Implementation (of HIVE-10924) (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e00f909d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e00f909d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e00f909d

Branch: refs/heads/master
Commit: e00f909dd35ee46de5d9de493bb76e37ba4b6f74
Parents: 52ba014
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Nov 12 12:20:01 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Nov 12 12:20:01 2016 -0800

----------------------------------------------------------------------
 .../hive/metastore/LockComponentBuilder.java    |   4 +
 .../metastore/txn/CompactionTxnHandler.java     |   5 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  51 +-
 .../java/org/apache/hadoop/hive/ql/Context.java |  63 +-
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   4 +-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |  11 +-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |   4 +
 .../apache/hadoop/hive/ql/hooks/ReadEntity.java |   3 +
 .../hadoop/hive/ql/hooks/WriteEntity.java       |  10 +
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |   9 +-
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |   2 +-
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |   7 +-
 .../BucketingSortingReduceSinkOptimizer.java    |   7 +-
 .../apache/hadoop/hive/ql/parse/ASTNode.java    |   8 +
 .../hadoop/hive/ql/parse/FromClauseParser.g     |   6 +-
 .../org/apache/hadoop/hive/ql/parse/HiveLexer.g |   2 +
 .../apache/hadoop/hive/ql/parse/HiveParser.g    |  60 +-
 .../hadoop/hive/ql/parse/IdentifiersParser.g    |   7 +-
 .../hadoop/hive/ql/parse/QBParseInfo.java       |   7 +
 .../hadoop/hive/ql/parse/SelectClauseParser.g   |   2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 155 +--
 .../hive/ql/parse/SemanticAnalyzerFactory.java  |   1 +
 .../ql/parse/UpdateDeleteSemanticAnalyzer.java  | 989 ++++++++++++++++---
 .../hadoop/hive/ql/plan/CreateTableDesc.java    |   4 +-
 .../metastore/txn/TestCompactionTxnHandler.java |   1 +
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  35 +
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 417 +++++++-
 .../ql/TestTxnCommands2WithSplitUpdate.java     |  13 +-
 .../hive/ql/lockmgr/TestDbTxnManager2.java      | 645 +++++++++++-
 .../apache/hadoop/hive/ql/parse/TestIUD.java    |   4 +-
 .../hive/ql/parse/TestMergeStatement.java       | 246 +++++
 31 files changed, 2464 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java b/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
index 3e8f193..e074152 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
@@ -102,6 +102,10 @@ public class LockComponentBuilder {
     partNameSet = true;
     return this;
   }
+  public LockComponentBuilder setIsDynamicPartitionWrite(boolean t) {
+    component.setIsDynamicPartitionWrite(t);
+    return this;
+  }
 
  /**
    * Get the constructed lock component.

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 75a4d87..9145fcc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -43,9 +43,6 @@ class CompactionTxnHandler extends TxnHandler {
   static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
-  // Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS
-  // See TxnHandler for notes on how to deal with deadlocks.  Follow those notes.
-
   public CompactionTxnHandler() {
   }
 
@@ -428,7 +425,7 @@ class CompactionTxnHandler extends TxnHandler {
   }
 
   /**
-   * Clean up aborted transactions from txns that have no components in txn_components.  The reson such
+   * Clean up aborted transactions from txns that have no components in txn_components.  The reason such
    * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
    * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 547ee98..a815f2c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -625,7 +625,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix));
         if (rs.next()) {
           close(rs);
-          //here means currently committing txn performed update/delete and we should check WW conflict
+          //if here it means currently committing txn performed update/delete and we should check WW conflict
           /**
            * This S4U will mutex with other commitTxn() and openTxns(). 
            * -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial
@@ -653,7 +653,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            */
           rs = stmt.executeQuery
             (sqlGenerator.addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," +
-              "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id " +
+              "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id, " +
+              "cur.ws_operation_type cur_op, committed.ws_operation_type committed_op " +
               "from WRITE_SET committed INNER JOIN WRITE_SET cur " +
               "ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " +
               //For partitioned table we always track writes at partition level (never at table)
@@ -677,7 +678,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               resource.append('/').append(partitionName);
             }
             String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource +
-              " committed by " + committedTxn;
+              " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8);
             close(rs);
             //remove WRITE_SET info for current txn since it's about to abort
             dbConn.rollback(undoWriteSetForCurrentTxn);
@@ -712,6 +713,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         int modCount = 0;
         if ((modCount = stmt.executeUpdate(s)) < 1) {
           //this can be reasonable for an empty txn START/COMMIT or read-only txn
+          //also an IUD with DP that didn't match any rows.
           LOG.info("Expected to move at least one record from txn_components to " +
             "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
         }
@@ -884,14 +886,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         if (txnid > 0) {
           List<String> rows = new ArrayList<>();
-          /**
-           * todo QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc
-           * FileSinkDesc.table is ql.metadata.Table
-           * Table.tableSpec which is TableSpec, which has specType which is SpecType
-           * So maybe this can work to know that this is part of dynamic partition insert in which case
-           * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here.
-           * In any case, that's an optimization for now;  will be required when adding multi-stmt txns
-           */
           // For each component in this lock request,
           // add an entry to the txn_components table
           for (LockComponent lc : rqst.getComponent()) {
@@ -909,7 +903,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
                 case INSERT:
                 case UPDATE:
                 case DELETE:
-                  updateTxnComponents = true;
+                  if(!lc.isSetIsDynamicPartitionWrite()) {
+                    //must be old client talking, i.e. we don't know if it's DP so be conservative
+                    updateTxnComponents = true;
+                  }
+                  else {
+                    /**
+                     * we know this is part of DP operation and so we'll get
+                     * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list
+                     * of partitions actually chaged.
+                     */
+                    updateTxnComponents = !lc.isIsDynamicPartitionWrite();
+                  }
                   break;
                 case SELECT:
                   updateTxnComponents = false;
@@ -1544,22 +1549,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         if(rqst.isSetOperationType()) {
           ot = OpertaionType.fromDataOperationType(rqst.getOperationType());
         }
-        
-        //what if a txn writes the same table > 1 time...(HIVE-9675) let's go with this for now, but really
-        //need to not write this in the first place, i.e. make this delete not needed
-        //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS
-        String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" +
-          quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
-        //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is
-        //much "wider" than necessary in a lot of cases.  Here on the other hand, we know exactly which
-        //partitions have been written to.  w/o this WRITE_SET would contain entries for partitions not actually
-        //written to
-        int modCount = stmt.executeUpdate(deleteSql);
         List<String> rows = new ArrayList<>();
         for (String partName : rqst.getPartitionnames()) {
           rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
             "," + quoteString(partName) + "," + quoteChar(ot.sqlConst));
         }
+        int modCount = 0;
+        //record partitions that were written to
         List<String> queries = sqlGenerator.createInsertValuesStmt(
           "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows);
         for(String query : queries) {
@@ -2080,7 +2076,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
 
     public int compare(LockInfo info1, LockInfo info2) {
-      // We sort by state (acquired vs waiting) and then by LockType, they by id
+      // We sort by state (acquired vs waiting) and then by LockType, then by id
       if (info1.state == LockState.ACQUIRED &&
         info2.state != LockState .ACQUIRED) {
         return -1;
@@ -2285,6 +2281,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     /**
      * todo: Longer term we should pass this from client somehow - this would be an optimization;  once
      * that is in place make sure to build and test "writeSet" below using OperationType not LockType
+     * With SP we assume that the query modifies exactly the partitions it locked.  (not entirely
+     * realistic since Update/Delete may have some predicate that filters out all records out of
+     * some partition(s), but plausible).  For DP, we acquire locks very wide (all known partitions),
+     * but for most queries only a fraction will actually be updated.  #addDynamicPartitions() tells
+     * us exactly which ones were written to.  Thus using this trick to kill a query early for
+     * DP queries may be too restrictive.
      */
     boolean isPartOfDynamicPartitionInsert = true;
     try {
@@ -2567,6 +2569,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       (desiredLock.txnId != 0 && desiredLock.txnId == existingLock.txnId) ||
       //txnId=0 means it's a select or IUD which does not write to ACID table, e.g
       //insert overwrite table T partition(p=1) select a,b from T and autoCommit=true
+      // todo: fix comment as of HIVE-14988
       (desiredLock.txnId == 0 &&  desiredLock.extLockId == existingLock.extLockId);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 838d73e..4355d21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -25,6 +25,7 @@ import java.net.URI;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -44,13 +45,13 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -107,11 +108,6 @@ public class Context {
   // Transaction manager for this query
   protected HiveTxnManager hiveTxnManager;
 
-  // Used to track what type of acid operation (insert, update, or delete) we are doing.  Useful
-  // since we want to change where bucket columns are accessed in some operators and
-  // optimizations when doing updates and deletes.
-  private AcidUtils.Operation acidOperation = AcidUtils.Operation.NOT_ACID;
-
   private boolean needLockMgr;
 
   private AtomicInteger sequencer = new AtomicInteger();
@@ -129,6 +125,53 @@ public class Context {
   private Heartbeater heartbeater;
 
   private boolean skipTableMasking;
+  /**
+   * This determines the prefix of the
+   * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.Phase1Ctx#dest}
+   * name for a given subtree of the AST.  Most of the times there is only 1 destination in a
+   * given tree but multi-insert has several and multi-insert representing MERGE must use
+   * different prefixes to encode the purpose of different Insert branches
+   */
+  private Map<ASTNode, DestClausePrefix> tree2DestNamePrefix;
+  public enum DestClausePrefix {
+    INSERT("insclause-"), UPDATE("updclause-"), DELETE("delclause-");
+    private final String prefix;
+    DestClausePrefix(String prefix) {
+      this.prefix = prefix;
+    }
+    public String toString() {
+      return prefix;
+    }
+  }
+  /**
+   * The suffix is always relative to a given ASTNode
+   */
+  public DestClausePrefix getDestNamePrefix(ASTNode curNode) {
+    //if there is no mapping, we want to default to "old" naming
+    assert curNode != null : "must supply curNode";
+    if(tree2DestNamePrefix == null || tree2DestNamePrefix.isEmpty()) {
+      return DestClausePrefix.INSERT;
+    }
+    do {
+      DestClausePrefix prefix = tree2DestNamePrefix.get(curNode);
+      if(prefix != null) {
+        return prefix;
+      }
+      curNode = (ASTNode) curNode.parent;
+    } while(curNode != null);
+    return DestClausePrefix.INSERT;
+  }
+  /**
+   * Will make SemanticAnalyzer.Phase1Ctx#dest in subtree rooted at 'tree' use 'prefix'
+   * @param tree
+   * @return previous prefix for 'tree' or null
+   */
+  public DestClausePrefix addDestNamePrefix(ASTNode tree, DestClausePrefix prefix) {
+    if(tree2DestNamePrefix == null) {
+      tree2DestNamePrefix = new IdentityHashMap<>();
+    }
+    return tree2DestNamePrefix.put(tree, prefix);
+  }
 
   public Context(Configuration conf) throws IOException {
     this(conf, generateExecutionId());
@@ -760,14 +803,6 @@ public class Context {
     this.tryCount = tryCount;
   }
 
-  public void setAcidOperation(AcidUtils.Operation op) {
-    acidOperation = op;
-  }
-
-  public AcidUtils.Operation getAcidOperation() {
-    return acidOperation;
-  }
-
   public String getCboInfo() {
     return cboInfo;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 923ef08..b77948b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1111,7 +1111,9 @@ public class Driver implements CommandProcessor {
       if (haveAcidWrite()) {
         for (FileSinkDesc desc : acidSinks) {
           desc.setTransactionId(txnMgr.getCurrentTxnId());
-          desc.setStatementId(txnMgr.getStatementId());
+          //it's possible to have > 1 FileSink writing to the same table/partition
+          //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes
+          desc.setStatementId(txnMgr.getWriteIdAndIncrement());
         }
       }
       /*Note, we have to record snapshot after lock acquisition to prevent lost update problem

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 7ed3907..97fcd55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -398,7 +398,7 @@ public enum ErrorMsg {
   DISTINCT_NOT_SUPPORTED(10285, "Distinct keyword is not support in current context"),
   NONACID_COMPACTION_NOT_SUPPORTED(10286, "Compaction is not allowed on non-ACID table {0}.{1}", true),
 
-  UPDATEDELETE_PARSE_ERROR(10290, "Encountered parse error while parsing rewritten update or " +
+  UPDATEDELETE_PARSE_ERROR(10290, "Encountered parse error while parsing rewritten merge/update or " +
       "delete query"),
   UPDATEDELETE_IO_ERROR(10291, "Encountered I/O error while parsing rewritten update or " +
       "delete query"),
@@ -456,6 +456,10 @@ public enum ErrorMsg {
   REPLACE_MATERIALIZED_WITH_VIEW(10401, "Attempt to replace materialized view {0} with view", true),
   UPDATE_DELETE_VIEW(10402, "You cannot update or delete records in a view"),
   MATERIALIZED_VIEW_DEF_EMPTY(10403, "Query for the materialized view rebuild could not be retrieved"),
+  MERGE_PREDIACTE_REQUIRED(10404, "MERGE statement with both UPDATE and DELETE clauses " +
+    "requires \"AND <boolean>\" on the 1st WHEN MATCHED clause of <{0}>", true),
+  MERGE_TOO_MANY_DELETE(10405, "MERGE statment can have at most 1 WHEN MATCHED ... DELETE clause: <{0}>", true),
+  MERGE_TOO_MANY_UPDATE(10406, "MERGE statment can have at most 1 WHEN MATCHED ... UPDATE clause: <{0}>", true),
   //========================== 20000 range starts here ========================//
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
@@ -722,6 +726,11 @@ public enum ErrorMsg {
     sb.append(":");
     sb.append(getCharPositionInLine(tree));
   }
+  public static String renderPosition(ASTNode n) {
+    StringBuilder sb = new StringBuilder();
+    ErrorMsg.renderPosition(sb, n);
+    return sb.toString();
+  }
 
   public String getMsg(Tree tree) {
     return getMsg((ASTNode) tree);

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 8265af4..349f115 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -206,6 +206,10 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
     }
 
     Context ctx = driverContext.getCtx();
+    if(ctx.getHiveTxnManager().supportsAcid()) {
+      //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes it more explicit
+      return;
+    }
     HiveLockManager lockMgr = ctx.getHiveTxnManager().getLockManager();
     WriteEntity output = ctx.getLoadTableOutputMap().get(ltd);
     List<HiveLockObj> lockObjects = ctx.getOutputLockObjects().get(output);

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
index 3d7de69..b805904 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
@@ -53,6 +53,9 @@ public class ReadEntity extends Entity implements Serializable {
   // important because in that case we shouldn't acquire a lock for it or authorize the read.
   // These will be handled by the output to the table instead.
   private boolean isUpdateOrDelete = false;
+  //https://issues.apache.org/jira/browse/HIVE-15048
+  public transient boolean isFromTopLevelQuery = true;
+
 
   // For views, the entities can be nested - by default, entities are at the top level
   // Must be deterministic order set for consistent q-test output across Java versions

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
index 9e18638..da8c1e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
@@ -38,6 +38,7 @@ public class WriteEntity extends Entity implements Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(WriteEntity.class);
 
   private boolean isTempURI = false;
+  private transient boolean isDynamicPartitionWrite = false;
 
   public static enum WriteType {
     DDL_EXCLUSIVE, // for use in DDL statements that require an exclusive lock,
@@ -221,5 +222,14 @@ public class WriteEntity extends Entity implements Serializable {
         throw new RuntimeException("Unknown operation " + op.toString());
     }
   }
+  public boolean isDynamicPartitionWrite() {
+    return isDynamicPartitionWrite;
+  }
+  public void setDynamicPartitionWrite(boolean t) {
+    isDynamicPartitionWrite = t;
+  }
+  public String toDetailedString() {
+    return toString() + " Type=" + getTyp() + " WriteType=" + getWriteType() + " isDP=" + isDynamicPartitionWrite();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index da7505b..867e445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -321,6 +321,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
       if(t != null && AcidUtils.isAcidTable(t)) {
         compBuilder.setIsAcid(true);
       }
+      compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
       LockComponent comp = compBuilder.build();
       LOG.debug("Adding lock component to lock request " + comp.toString());
       rqstBuilder.addLockComponent(comp);
@@ -335,9 +336,6 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     }
 
     List<HiveLock> locks = new ArrayList<HiveLock>(1);
-    if(isTxnOpen()) {
-      statementId++;
-    }
     LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks);
     ctx.setHiveLocks(locks);
     return lockState;
@@ -650,8 +648,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     return txnId;
   }
   @Override
-  public int getStatementId() {
-    return statementId;
+  public int getWriteIdAndIncrement() {
+    assert isTxnOpen();
+    return statementId++;
   }
 
   private static long getHeartbeatInterval(Configuration conf) throws LockException {

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 1d071a8..f001f59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -63,7 +63,7 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
-  public int getStatementId() {
+  public int getWriteIdAndIncrement() {
     return 0;
   }
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 9b4a97f..5b9ad60 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -210,8 +210,9 @@ public interface HiveTxnManager {
   long getCurrentTxnId();
 
   /**
-   * 0..N Id of current statement within currently opened transaction
+   * Should be though of more as a unique write operation ID in a given txn (at QueryPlan level).
+   * Each statement writing data within a multi statement txn should have a unique WriteId.
+   * Even a single statement, (e.g. Merge, multi-insert may generates several writes).
    */
-  int getStatementId();
-
+  int getWriteIdAndIncrement();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
index da261bb..8f40998 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
@@ -401,9 +401,12 @@ public class BucketingSortingReduceSinkOptimizer extends Transform {
         return null;
       }
 
+      assert fsOp.getConf().getWriteType() == rsOp.getConf().getWriteType() :
+        "WriteType mismatch. fsOp is " + fsOp.getConf().getWriteType() +
+          "; rsOp is " + rsOp.getConf().getWriteType();
       // Don't do this optimization with updates or deletes
-      if (pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.UPDATE ||
-          pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.DELETE){
+      if (fsOp.getConf().getWriteType() == AcidUtils.Operation.UPDATE ||
+        fsOp.getConf().getWriteType() == AcidUtils.Operation.DELETE) {
         return null;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
index 62f9d14..0e6d903 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
@@ -42,6 +42,7 @@ public class ASTNode extends CommonTree implements Node,Serializable {
   private transient ASTNode rootNode;
   private transient boolean isValidASTStr;
   private transient boolean visited = false;
+  transient String matchedText;
 
   public ASTNode() {
   }
@@ -347,4 +348,11 @@ public class ASTNode extends CommonTree implements Node,Serializable {
     return rootNode.getMemoizedSubString(startIndx, endIndx);
   }
 
+  /**
+   * The string that generated this node.
+   * Only set for a node if parser grammar sets it for a particular rule
+   */
+  public String getMatchedText() {
+    return matchedText;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
index 26aca96..f8adb38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
@@ -19,7 +19,7 @@ parser grammar FromClauseParser;
 options
 {
 output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
 backtrack=false;
 k=3;
 }
@@ -142,7 +142,7 @@ tableAlias
 
 fromSource
 @init { gParent.pushMsg("from source", state); }
-@after { gParent.popMsg(state); }
+@after { $fromSource.tree.matchedText = $fromSource.text; gParent.popMsg(state); }
     :
     (LPAREN KW_VALUES) => fromSource0
     | (LPAREN) => LPAREN joinSource RPAREN -> joinSource
@@ -278,7 +278,7 @@ searchCondition
 // INSERT INTO <table> (col1,col2,...) SELECT * FROM (VALUES(1,2,3),(4,5,6),...) as Foo(a,b,c)
 valueRowConstructor
 @init { gParent.pushMsg("value row constructor", state); }
-@after { gParent.popMsg(state); }
+@after { $valueRowConstructor.tree.matchedText = $valueRowConstructor.text; gParent.popMsg(state); }
     :
     LPAREN precedenceUnaryPrefixExpression (COMMA precedenceUnaryPrefixExpression)* RPAREN -> ^(TOK_VALUE_ROW precedenceUnaryPrefixExpression+)
     ;

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
index 63c32a8..b467c51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
@@ -336,6 +336,8 @@ KW_KEY: 'KEY';
 KW_ABORT: 'ABORT';
 KW_EXTRACT: 'EXTRACT';
 KW_FLOOR: 'FLOOR';
+KW_MERGE: 'MERGE';
+KW_MATCHED: 'MATCHED';
 
 // Operators
 // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work.

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 8aa39b0..bd53a36 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -20,7 +20,7 @@ options
 {
 tokenVocab=HiveLexer;
 output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
 backtrack=false;
 k=3;
 }
@@ -384,6 +384,11 @@ TOK_ROLLBACK;
 TOK_SET_AUTOCOMMIT;
 TOK_CACHE_METADATA;
 TOK_ABORT_TRANSACTIONS;
+TOK_MERGE;
+TOK_MATCHED;
+TOK_NOT_MATCHED;
+TOK_UPDATE;
+TOK_DELETE;
 }
 
 
@@ -737,6 +742,7 @@ execStatement
     | deleteStatement
     | updateStatement
     | sqlTransactionStatement
+    | mergeStatement
     ;
 
 loadStatement
@@ -2670,3 +2676,55 @@ abortTransactionStatement
   :
   KW_ABORT KW_TRANSACTIONS ( Number )+ -> ^(TOK_ABORT_TRANSACTIONS ( Number )+)
   ;
+
+
+/*
+BEGIN SQL Merge statement
+*/
+mergeStatement
+@init { pushMsg("MERGE statement", state); }
+@after { popMsg(state); }
+   :
+   KW_MERGE KW_INTO tableName (KW_AS? identifier)? KW_USING fromSource KW_ON expression whenClauses ->
+    ^(TOK_MERGE ^(TOK_TABREF tableName identifier?) fromSource expression whenClauses)
+   ;
+/*
+Allow 0,1 or 2 WHEN MATCHED clauses and 0 or 1 WHEN NOT MATCHED
+Each WHEN clause may have AND <boolean predicate>.
+If 2 WHEN MATCHED clauses are present, 1 must be UPDATE the other DELETE and the 1st one
+must have AND <boolean predicate>
+*/
+whenClauses
+   :
+   (whenMatchedAndClause|whenMatchedThenClause)* whenNotMatchedClause?
+   ;
+whenNotMatchedClause
+@init { pushMsg("WHEN NOT MATCHED clause", state); }
+@after { popMsg(state); }
+   :
+  KW_WHEN KW_NOT KW_MATCHED (KW_AND expression)? KW_THEN KW_INSERT KW_VALUES valueRowConstructor ->
+    ^(TOK_NOT_MATCHED ^(TOK_INSERT valueRowConstructor) expression?)
+  ;
+whenMatchedAndClause
+@init { pushMsg("WHEN MATCHED AND clause", state); }
+@after { popMsg(state); }
+  :
+  KW_WHEN KW_MATCHED KW_AND expression KW_THEN updateOrDelete ->
+    ^(TOK_MATCHED updateOrDelete expression)
+  ;
+whenMatchedThenClause
+@init { pushMsg("WHEN MATCHED THEN clause", state); }
+@after { popMsg(state); }
+  :
+  KW_WHEN KW_MATCHED KW_THEN updateOrDelete ->
+     ^(TOK_MATCHED updateOrDelete)
+  ;
+updateOrDelete
+   :
+   KW_UPDATE setColumnsClause -> ^(TOK_UPDATE setColumnsClause)
+   |
+   KW_DELETE -> TOK_DELETE
+   ;
+/*
+END SQL Merge statement
+*/

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index 2e40aa5..aa92739 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -19,7 +19,7 @@ parser grammar IdentifiersParser;
 options
 {
 output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
 backtrack=false;
 k=3;
 }
@@ -383,7 +383,7 @@ intervalQualifiers
 
 expression
 @init { gParent.pushMsg("expression specification", state); }
-@after { gParent.popMsg(state); }
+@after { $expression.tree.matchedText = $expression.text; gParent.popMsg(state); }
     :
     precedenceOrExpression
     ;
@@ -459,6 +459,7 @@ precedencePlusOperator
     ;
 
 precedencePlusExpression
+@after { $precedencePlusExpression.tree.matchedText = $precedencePlusExpression.text; }
     :
     precedenceStarExpression (precedencePlusOperator^ precedenceStarExpression)*
     ;
@@ -759,6 +760,8 @@ nonReserved
     | KW_VALIDATE
     | KW_NOVALIDATE
     | KW_KEY
+    | KW_MERGE
+    | KW_MATCHED
 ;
 
 //The following SQL2011 reserved keywords are used as function name only, but not as identifiers.

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
index 3a0402e..f549dff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
@@ -181,6 +181,9 @@ public class QBParseInfo {
     insertIntoTables.put(fullName.toLowerCase(), ast);
   }
 
+  /**
+   * See also {@link #getInsertOverwriteTables()}
+   */
   public boolean isInsertIntoTable(String dbName, String table) {
     String fullName = dbName + "." + table;
     return insertIntoTables.containsKey(fullName.toLowerCase());
@@ -188,6 +191,7 @@ public class QBParseInfo {
 
   /**
    * Check if a table is in the list to be inserted into
+   * See also {@link #getInsertOverwriteTables()}
    * @param fullTableName table name in dbname.tablename format
    * @return
    */
@@ -640,6 +644,9 @@ public class QBParseInfo {
     this.isPartialScanAnalyzeCommand = isPartialScanAnalyzeCommand;
   }
 
+  /**
+   * See also {@link #isInsertIntoTable(String)}
+   */
   public Map<String, ASTNode> getInsertOverwriteTables() {
     return insertOverwriteTables;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
index 3c6fa39..2c2e856 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
@@ -19,7 +19,7 @@ parser grammar SelectClauseParser;
 options
 {
 output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
 backtrack=false;
 k=3;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 17dfd03..8f5542b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hive.conf.HiveConf.StrictChecks;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryProperties;
 import org.apache.hadoop.hive.ql.QueryState;
@@ -679,17 +681,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     this.ast = newAST;
   }
 
-  /**
-   * Goes though the tabref tree and finds the alias for the table. Once found,
-   * it records the table name-> alias association in aliasToTabs. It also makes
-   * an association from the alias to the table AST in parse info.
-   *
-   * @return the alias of the table
-   */
-  private String processTable(QB qb, ASTNode tabref) throws SemanticException {
-    // For each table reference get the table name
-    // and the alias (if alias is not present, the table name
-    // is used as an alias)
+  int[] findTabRefIdxs(ASTNode tabref) {
+    assert tabref.getType() == HiveParser.TOK_TABREF;
     int aliasIndex = 0;
     int propsIndex = -1;
     int tsampleIndex = -1;
@@ -706,11 +699,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         aliasIndex = index;
       }
     }
-
+    return new int[] {aliasIndex, propsIndex, tsampleIndex, ssampleIndex};
+  }
+  String findSimpleTableName(ASTNode tabref, int aliasIndex) {
+    assert tabref.getType() == HiveParser.TOK_TABREF;
     ASTNode tableTree = (ASTNode) (tabref.getChild(0));
 
-    String tabIdName = getUnescapedName(tableTree).toLowerCase();
-
     String alias;
     if (aliasIndex != 0) {
       alias = unescapeIdentifier(tabref.getChild(aliasIndex).getText());
@@ -718,6 +712,30 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     else {
       alias = getUnescapedUnqualifiedTableName(tableTree);
     }
+    return alias;
+  }
+  /**
+   * Goes though the tabref tree and finds the alias for the table. Once found,
+   * it records the table name-> alias association in aliasToTabs. It also makes
+   * an association from the alias to the table AST in parse info.
+   *
+   * @return the alias of the table
+   */
+  private String processTable(QB qb, ASTNode tabref) throws SemanticException {
+    // For each table reference get the table name
+    // and the alias (if alias is not present, the table name
+    // is used as an alias)
+    int[] indexes = findTabRefIdxs(tabref);
+    int aliasIndex = indexes[0];
+    int propsIndex = indexes[1];
+    int tsampleIndex = indexes[2];
+    int ssampleIndex = indexes[3];
+
+    ASTNode tableTree = (ASTNode) (tabref.getChild(0));
+
+    String tabIdName = getUnescapedName(tableTree).toLowerCase();
+
+    String alias = findSimpleTableName(tabref, aliasIndex);
 
     if (propsIndex >= 0) {
       Tree propsAST = tabref.getChild(propsIndex);
@@ -1423,7 +1441,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         doPhase1GetColumnAliasesFromSelect(ast, qbp);
         qbp.setAggregationExprsForClause(ctx_1.dest, aggregations);
         qbp.setDistinctFuncExprsForClause(ctx_1.dest,
-        doPhase1GetDistinctFuncExprs(aggregations));
+          doPhase1GetDistinctFuncExprs(aggregations));
         break;
 
       case HiveParser.TOK_WHERE:
@@ -1438,7 +1456,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         qbp.addInsertIntoTable(tab_name, ast);
 
       case HiveParser.TOK_DESTINATION:
-        ctx_1.dest = "insclause-" + ctx_1.nextNum;
+        ctx_1.dest = this.ctx.getDestNamePrefix(ast).toString() + ctx_1.nextNum;
         ctx_1.nextNum++;
         boolean isTmpFileDest = false;
         if (ast.getChildCount() > 0 && ast.getChild(0) instanceof ASTNode) {
@@ -1945,25 +1963,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(alias));
         }
       }
-
-      // Disallow INSERT INTO on bucketized tables
-      boolean isAcid = AcidUtils.isAcidTable(tab);
-      boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName());
-      if (isTableWrittenTo &&
-          tab.getNumBuckets() > 0 && !isAcid) {
-        throw new SemanticException(ErrorMsg.INSERT_INTO_BUCKETIZED_TABLE.
-            getMsg("Table: " + tabName));
-      }
-      // Disallow update and delete on non-acid tables
-      if ((updating() || deleting()) && !isAcid && isTableWrittenTo) {
-        //isTableWrittenTo: delete from acidTbl where a in (select id from nonAcidTable)
-        //so only assert this if we are actually writing to this table
-        // Whether we are using an acid compliant transaction manager has already been caught in
-        // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid
-        // here, it means the table itself doesn't support it.
-        throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, tabName);
-      }
-
      if (tab.isView()) {
         if (qb.getParseInfo().isAnalyzeCommand()) {
           throw new SemanticException(ErrorMsg.ANALYZE_VIEW.getMsg());
@@ -2086,6 +2085,21 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
                 .getMsg(ast, "The class is " + outputFormatClass.toString()));
           }
 
+          boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(ts.tableHandle.getDbName(),
+            ts.tableHandle.getTableName());
+          isTableWrittenTo |= (qb.getParseInfo().getInsertOverwriteTables().
+            get(getUnescapedName((ASTNode) ast.getChild(0), ts.tableHandle.getDbName())) != null);
+          assert isTableWrittenTo :
+            "Inconsistent data structure detected: we are writing to " + ts.tableHandle  + " in " +
+              name + " but it's not in isInsertIntoTable() or getInsertOverwriteTables()";
+          // Disallow update and delete on non-acid tables
+          boolean isAcid = AcidUtils.isAcidTable(ts.tableHandle);
+          if ((updating(name) || deleting(name)) && !isAcid) {
+            // Whether we are using an acid compliant transaction manager has already been caught in
+            // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid
+            // here, it means the table itself doesn't support it.
+            throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, ts.tableName);
+          }
           // TableSpec ts is got from the query (user specified),
           // which means the user didn't specify partitions in their query,
           // but whether the table itself is partitioned is not know.
@@ -6421,7 +6435,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     if (dest_tab.getNumBuckets() > 0) {
       enforceBucketing = true;
-      if (updating() || deleting()) {
+      if (updating(dest) || deleting(dest)) {
         partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true);
       } else {
         partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
@@ -6472,7 +6486,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         nullOrder.append(sortOrder == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? 'a' : 'z');
       }
       input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), nullOrder.toString(),
-              maxReducers, (AcidUtils.isAcidTable(dest_tab) ? getAcidType() : AcidUtils.Operation.NOT_ACID));
+              maxReducers, (AcidUtils.isAcidTable(dest_tab) ? getAcidType(dest) : AcidUtils.Operation.NOT_ACID));
       reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0));
       ctx.setMultiFileSpray(multiFileSpray);
       ctx.setNumFiles(numFiles);
@@ -6488,7 +6502,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     if ((dest_tab.getNumBuckets() > 0)) {
       enforceBucketing = true;
-      if (updating() || deleting()) {
+      if (updating(dest) || deleting(dest)) {
         partnColsNoConvert = getPartitionColsFromBucketColsForUpdateDelete(input, false);
       } else {
         partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
@@ -6646,7 +6660,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       if (!isNonNativeTable) {
         AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
         if (destTableIsAcid) {
-          acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+          acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
           checkAcidConstraints(qb, table_desc, dest_tab);
         }
         ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp);
@@ -6666,7 +6680,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       // in the case of DP, we will register WriteEntity in MoveTask when the
       // list of dynamically created partitions are known.
       if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
-        output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable));
+        output = new WriteEntity(dest_tab,  determineWriteType(ltd, isNonNativeTable, dest));
         if (!outputs.add(output)) {
           throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
               .getMsg(dest_tab.getTableName()));
@@ -6675,8 +6689,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
         // No static partition specified
         if (dpCtx.getNumSPCols() == 0) {
-          output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false);
+          output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest), false);
           outputs.add(output);
+          output.setDynamicPartitionWrite(true);
         }
         // part of the partition specified
         // Create a DummyPartition in this case. Since, the metastore does not store partial
@@ -6689,7 +6704,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
                 new DummyPartition(dest_tab, dest_tab.getDbName()
                     + "@" + dest_tab.getTableName() + "@" + ppath,
                     partSpec);
-            output = new WriteEntity(p, getWriteType(), false);
+            output = new WriteEntity(p, getWriteType(dest), false);
+            output.setDynamicPartitionWrite(true);
             outputs.add(output);
           } catch (HiveException e) {
             throw new SemanticException(e.getMessage(), e);
@@ -6753,7 +6769,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
           dest_part.isStoredAsSubDirectories(), conf);
       AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
       if (destTableIsAcid) {
-        acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+        acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
         checkAcidConstraints(qb, table_desc, dest_tab);
       }
       ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp);
@@ -6762,9 +6778,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       ltd.setLbCtx(lbCtx);
 
       loadTableWork.add(ltd);
-
       if (!outputs.add(new WriteEntity(dest_part,
-        determineWriteType(ltd, dest_tab.isNonNative())))) {
+        determineWriteType(ltd, dest_tab.isNonNative(), dest)))) {
+
         throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
             .getMsg(dest_tab.getTableName() + "@" + dest_part.getName()));
       }
@@ -6925,7 +6941,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
 
-    if (updating() || deleting()) {
+    if (updating(dest) || deleting(dest)) {
       vecCol.add(new ColumnInfo(VirtualColumn.ROWID.getName(), VirtualColumn.ROWID.getTypeInfo(),
           "", true));
     } else {
@@ -6978,8 +6994,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     // If this is an insert, update, or delete on an ACID table then mark that so the
     // FileSinkOperator knows how to properly write to it.
     if (destTableIsAcid) {
-      AcidUtils.Operation wt = updating() ? AcidUtils.Operation.UPDATE :
-          (deleting() ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
+      AcidUtils.Operation wt = updating(dest) ? AcidUtils.Operation.UPDATE :
+          (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
       fileSinkDesc.setWriteType(wt);
       acidFileSinks.add(fileSinkDesc);
     }
@@ -7150,7 +7166,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     // The numbers of input columns and output columns should match for regular query
-    if (!updating() && !deleting() && inColumnCnt != outColumnCnt) {
+    if (!updating(dest) && !deleting(dest) && inColumnCnt != outColumnCnt) {
       String reason = "Table " + dest + " has " + outColumnCnt
           + " columns, but query has " + inColumnCnt + " columns.";
       throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
@@ -7169,18 +7185,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         MetadataTypedColumnsetSerDe.class);
     boolean isLazySimpleSerDe = table_desc.getDeserializerClass().equals(
         LazySimpleSerDe.class);
-    if (!isMetaDataSerDe && !deleting()) {
+    if (!isMetaDataSerDe && !deleting(dest)) {
 
       // If we're updating, add the ROW__ID expression, then make the following column accesses
       // offset by 1 so that we don't try to convert the ROW__ID
-      if (updating()) {
+      if (updating(dest)) {
         expressions.add(new ExprNodeColumnDesc(rowFields.get(0).getType(),
             rowFields.get(0).getInternalName(), "", true));
       }
 
       // here only deals with non-partition columns. We deal with partition columns next
       for (int i = 0; i < columnNumber; i++) {
-        int rowFieldsOffset = updating() ? i + 1 : i;
+        int rowFieldsOffset = updating(dest) ? i + 1 : i;
         ObjectInspector tableFieldOI = tableFields.get(i)
             .getFieldObjectInspector();
         TypeInfo tableFieldTypeInfo = TypeInfoUtils
@@ -7218,7 +7234,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     // deal with dynamic partition columns: convert ExprNodeDesc type to String??
     if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) {
       // DP columns starts with tableFields.size()
-      for (int i = tableFields.size() + (updating() ? 1 : 0); i < rowFields.size(); ++i) {
+      for (int i = tableFields.size() + (updating(dest) ? 1 : 0); i < rowFields.size(); ++i) {
         TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
         ExprNodeDesc column = new ExprNodeColumnDesc(
             rowFieldTypeInfo, rowFields.get(i).getInternalName(), "", true);
@@ -10525,7 +10541,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   /**
-   * Planner specific stuff goen in here.
+   * Planner specific stuff goes in here.
    */
   static class PlannerContext {
     protected ASTNode   child;
@@ -13045,18 +13061,19 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
-  private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable) {
+  private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable, String dest) {
     // Don't know the characteristics of non-native tables,
     // and don't have a rational way to guess, so assume the most
     // conservative case.
     if (isNonNativeTable) return WriteEntity.WriteType.INSERT_OVERWRITE;
-    else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : getWriteType());
-  }
-  private WriteEntity.WriteType getWriteType() {
-    return updating() ? WriteEntity.WriteType.UPDATE :
-      (deleting() ? WriteEntity.WriteType.DELETE : WriteEntity.WriteType.INSERT);
+    else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
+      getWriteType(dest));
   }
 
+  private WriteEntity.WriteType getWriteType(String dest) {
+    return updating(dest) ? WriteEntity.WriteType.UPDATE :
+      (deleting(dest) ? WriteEntity.WriteType.DELETE : WriteEntity.WriteType.INSERT);
+  }
   private boolean isAcidOutputFormat(Class<? extends OutputFormat> of) {
     Class<?>[] interfaces = of.getInterfaces();
     for (Class<?> iface : interfaces) {
@@ -13069,28 +13086,28 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
   // Note that this method assumes you have already decided this is an Acid table.  It cannot
   // figure out if a table is Acid or not.
-  private AcidUtils.Operation getAcidType() {
-    return deleting() ? AcidUtils.Operation.DELETE :
-        (updating() ? AcidUtils.Operation.UPDATE :
+  private AcidUtils.Operation getAcidType(String destination) {
+    return deleting(destination) ? AcidUtils.Operation.DELETE :
+        (updating(destination) ? AcidUtils.Operation.UPDATE :
             AcidUtils.Operation.INSERT);
   }
 
-  private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of) {
+  private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of, String dest) {
     if (SessionState.get() == null || !SessionState.get().getTxnMgr().supportsAcid()) {
       return AcidUtils.Operation.NOT_ACID;
     } else if (isAcidOutputFormat(of)) {
-      return getAcidType();
+      return getAcidType(dest);
     } else {
       return AcidUtils.Operation.NOT_ACID;
     }
   }
 
-  protected boolean updating() {
-    return false;
+  protected boolean updating(String destination) {
+    return destination.startsWith(Context.DestClausePrefix.UPDATE.toString());
   }
 
-  protected boolean deleting() {
-    return false;
+  protected boolean deleting(String destination) {
+    return destination.startsWith(Context.DestClausePrefix.DELETE.toString());
   }
 
   // Make sure the proper transaction manager that supports ACID is being used

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 4f0ead0..ed01a31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -296,6 +296,7 @@ public final class SemanticAnalyzerFactory {
 
       case HiveParser.TOK_UPDATE_TABLE:
       case HiveParser.TOK_DELETE_FROM:
+      case HiveParser.TOK_MERGE:
         return new UpdateDeleteSemanticAnalyzer(queryState);
 
       case HiveParser.TOK_START_TRANSACTION:


[2/3] hive git commit: HIVE-14943 Base Implementation (of HIVE-10924) (Eugene Koifman, reviewed by Alan Gates)

Posted by ek...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 5b874e4..55a3735 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -18,7 +18,9 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -26,15 +28,16 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
@@ -46,7 +49,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 
 /**
  * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
- * update and delete statements.  It works by rewriting the updates and deletes into insert
+ * update, delete and merge statements.  It works by rewriting the updates and deletes into insert
  * statements (since they are actually inserts) and then doing some patch up to make them work as
  * updates and deletes instead.
  */
@@ -70,46 +73,249 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
       switch (tree.getToken().getType()) {
         case HiveParser.TOK_DELETE_FROM:
           analyzeDelete(tree);
-          return;
-
+          break;
         case HiveParser.TOK_UPDATE_TABLE:
           analyzeUpdate(tree);
-          return;
-
+          break;
+        case HiveParser.TOK_MERGE:
+          analyzeMerge(tree);
+          break;
         default:
           throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
               "UpdateDeleteSemanticAnalyzer");
       }
+      cleanUpMetaColumnAccessControl();
+
     }
   }
-
-  @Override
-  protected boolean updating() {
-    return ctx.getAcidOperation() == AcidUtils.Operation.UPDATE;
+  private boolean updating() {
+    return currentOperation == Operation.UPDATE;
   }
-
-  @Override
-  protected boolean deleting() {
-    return ctx.getAcidOperation() == AcidUtils.Operation.DELETE;
+  private boolean deleting() {
+    return currentOperation == Operation.DELETE;
   }
 
   private void analyzeUpdate(ASTNode tree) throws SemanticException {
-    ctx.setAcidOperation(AcidUtils.Operation.UPDATE);
+    currentOperation = Operation.UPDATE;
     reparseAndSuperAnalyze(tree);
   }
 
   private void analyzeDelete(ASTNode tree) throws SemanticException {
-    ctx.setAcidOperation(AcidUtils.Operation.DELETE);
+    currentOperation = Operation.DELETE;
     reparseAndSuperAnalyze(tree);
   }
+  /**
+   * Append list of partition columns to Insert statement, i.e. the 1st set of partCol1,partCol2
+   * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2...
+   */
+  private void addPartitionColsToInsert(List<FieldSchema> partCols, StringBuilder rewrittenQueryStr) {
+    // If the table is partitioned we have to put the partition() clause in
+    if (partCols != null && partCols.size() > 0) {
+      rewrittenQueryStr.append(" partition (");
+      boolean first = true;
+      for (FieldSchema fschema : partCols) {
+        if (first)
+          first = false;
+        else
+          rewrittenQueryStr.append(", ");
+        //would be nice if there was a way to determine if quotes are needed
+        rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
+      }
+      rewrittenQueryStr.append(")");
+    }
+  }
+  /**
+   * Append list of partition columns to Insert statement, i.e. the 2nd set of partCol1,partCol2
+   * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2...
+   * @param targetName simple target table name (i.e. name or alias)
+   */
+  private void addPartitionColsToSelect(List<FieldSchema> partCols, StringBuilder rewrittenQueryStr, String targetName) {
+    // If the table is partitioned, we need to select the partition columns as well.
+    if (partCols != null) {
+      for (FieldSchema fschema : partCols) {
+        rewrittenQueryStr.append(", ");
+        //would be nice if there was a way to determine if quotes are needed
+        if(targetName != null) {
+          rewrittenQueryStr.append(HiveUtils.unparseIdentifier(targetName, this.conf)).append('.');
+        }
+        rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
+      }
+    }
+  }
+  /**
+   * Assert that we are not asked to update a bucketing column or partition column
+   * @param colName it's the A in "SET A = B"
+   */
+  private void checkValidSetClauseTarget(ASTNode colName, List<FieldSchema> partCols,
+                                         List<String> bucketingCols) throws SemanticException {
+    String columnName = normalizeColName(colName.getText());
 
+    // Make sure this isn't one of the partitioning columns, that's not supported.
+    if (partCols != null) {
+      for (FieldSchema fschema : partCols) {
+        if (fschema.getName().equalsIgnoreCase(columnName)) {
+          throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
+        }
+      }
+    }
+    //updating bucket column should move row from one file to another - not supported
+    if(bucketingCols != null && bucketingCols.contains(columnName)) {
+      throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE,columnName);
+    }
+  }
+  private ASTNode findLHSofAssignment(ASTNode assignment) {
+    assert assignment.getToken().getType() == HiveParser.EQUAL :
+      "Expected set assignments to use equals operator but found " + assignment.getName();
+    ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
+    assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
+      "Expected left side of assignment to be table or column";
+    ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
+    assert colName.getToken().getType() == HiveParser.Identifier :
+      "Expected column name";
+    return colName;
+  }
+  private Map<String, ASTNode> collectSetColumnsAndExpressions(
+    ASTNode setClause,List<FieldSchema> partCols, List<String> bucketingCols, Set<String> setRCols)
+    throws SemanticException {
+    // An update needs to select all of the columns, as we rewrite the entire row.  Also,
+    // we need to figure out which columns we are going to replace.
+    assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
+      "Expected second child of update token to be set token";
+
+    // Get the children of the set clause, each of which should be a column assignment
+    List<? extends Node> assignments = setClause.getChildren();
+    // Must be deterministic order map for consistent q-test output across Java versions
+    Map<String, ASTNode> setCols = new LinkedHashMap<String, ASTNode>(assignments.size());
+    for (Node a : assignments) {
+      ASTNode assignment = (ASTNode)a;
+      ASTNode colName = findLHSofAssignment(assignment);
+      if(setRCols != null) {
+        addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols);
+      }
+      checkValidSetClauseTarget(colName, partCols, bucketingCols);
+
+      String columnName = normalizeColName(colName.getText());
+      // This means that in UPDATE T SET x = _something_
+      // _something_ can be whatever is supported in SELECT _something_
+      setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
+    }
+    return setCols;
+  }
+  /**
+   * @return the Metastore representation of the target table
+   */
+  private Table getTargetTable(ASTNode tabRef) throws SemanticException {
+    String[] tableName;
+    Table mTable;
+    switch (tabRef.getType()) {
+      case HiveParser.TOK_TABREF:
+        tableName = getQualifiedTableName((ASTNode) tabRef.getChild(0));
+        break;
+      case HiveParser.TOK_TABNAME:
+        tableName = getQualifiedTableName(tabRef);
+        break;
+      default:
+          throw raiseWrongType("TOK_TABREF|TOK_TABNAME", tabRef);
+    }
+    try {
+      mTable = db.getTable(tableName[0], tableName[1]);
+    } catch (InvalidTableException e) {
+      LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
+        + e.getMessage());
+      throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(getDotName(tableName)), e);
+    } catch (HiveException e) {
+      LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
+        + e.getMessage());
+      throw new SemanticException(e.getMessage(), e);
+    }
+    return mTable;
+  }
+  // Walk through all our inputs and set them to note that this read is part of an update or a
+  // delete.
+  private void markReadEntityForUpdate() {
+    for (ReadEntity input : inputs) {
+      if(isWritten(input)) {
+        //todo: this is actually not adding anything since LockComponent uses a Trie to "promote" a lock
+        //except by accident - when we have a partitioned target table we have a ReadEntity and WriteEntity
+        //for the table, so we mark ReadEntity and then delete WriteEntity (replace with Partition entries)
+        //so DbTxnManager skips Read lock on the ReadEntity....
+        input.setUpdateOrDelete(true);//input.noLockNeeded()?
+      }
+    }
+  }
+  /**
+   *  For updates, we need to set the column access info so that it contains information on
+   *  the columns we are updating.
+   *  (But not all the columns of the target table even though the rewritten query writes
+   *  all columns of target table since that is an implmentation detail)
+   */
+  private void setUpAccessControlInfoForUpdate(Table mTable, Map<String, ASTNode> setCols) {
+    ColumnAccessInfo cai = new ColumnAccessInfo();
+    for (String colName : setCols.keySet()) {
+      cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
+    }
+    setUpdateColumnAccessInfo(cai);
+  }
+  /**
+   * We need to weed ROW__ID out of the input column info, as it doesn't make any sense to
+   * require the user to have authorization on that column.
+   */
+  private void cleanUpMetaColumnAccessControl() {
+    //we do this for Update/Delete (incl Merge) because we introduce this column into the query
+    //as part of rewrite
+    if (columnAccessInfo != null) {
+      columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID);
+    }
+  }
+  /**
+   * Parse the newly generated SQL statment to get a new AST
+   */
+  private ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, String originalQuery) throws SemanticException {
+    // Parse the rewritten query string
+    Context rewrittenCtx;
+    try {
+      // Set dynamic partitioning to nonstrict so that queries do not need any partition
+      // references.
+      // todo: this may be a perf issue as it prevents the optimizer.. or not
+      HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+      rewrittenCtx = new Context(conf);
+      rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
+    } catch (IOException e) {
+      throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
+    }
+    rewrittenCtx.setCmd(rewrittenQueryStr.toString());
+
+    ParseDriver pd = new ParseDriver();
+    ASTNode rewrittenTree;
+    try {
+      LOG.info("Going to reparse <" + originalQuery + "> as \n<" + rewrittenQueryStr.toString() + ">");
+      rewrittenTree = pd.parse(rewrittenQueryStr.toString(), rewrittenCtx);
+      rewrittenTree = ParseUtils.findRootNonNullToken(rewrittenTree);
+
+    } catch (ParseException e) {
+      throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
+    }
+    return new ReparseResult(rewrittenTree, rewrittenCtx);
+  }
+  /**
+   * Assert it supports Acid write
+   */
+  private void validateTargetTable(Table mTable) throws SemanticException {
+    if (mTable.getTableType() == TableType.VIRTUAL_VIEW ||
+      mTable.getTableType() == TableType.MATERIALIZED_VIEW) {
+        LOG.error("Table " + getDotName(new String[] {mTable.getDbName(), mTable.getTableName()}) + " is a view or materialized view");
+        throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg());
+    }
+  }
+  /**
+   * This supports update and delete statements
+   */
   private void reparseAndSuperAnalyze(ASTNode tree) throws SemanticException {
     List<? extends Node> children = tree.getChildren();
     // The first child should be the table we are deleting from
     ASTNode tabName = (ASTNode)children.get(0);
     assert tabName.getToken().getType() == HiveParser.TOK_TABNAME :
         "Expected tablename as first child of " + operation() + " but found " + tabName.getName();
-    String[] tableName = getQualifiedTableName(tabName);
 
     // Rewrite the delete or update into an insert.  Crazy, but it works as deletes and update
     // actually are inserts into the delta file in Hive.  A delete
@@ -129,98 +335,31 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
     // merge on read.
 
     StringBuilder rewrittenQueryStr = new StringBuilder();
-    Table mTable;
-    try {
-      mTable = db.getTable(tableName[0], tableName[1]);
-    } catch (InvalidTableException e) {
-      LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
-          + e.getMessage());
-      throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(getDotName(tableName)), e);
-    } catch (HiveException e) {
-      LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
-          + e.getMessage());
-      throw new SemanticException(e.getMessage(), e);
-    }
-
-    if (mTable.getTableType() == TableType.VIRTUAL_VIEW ||
-        mTable.getTableType() == TableType.MATERIALIZED_VIEW) {
-      LOG.error("Table " + getDotName(tableName) + " is a view or materialized view");
-      throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg());
-    }
+    Table mTable = getTargetTable(tabName);
+    validateTargetTable(mTable);
 
     List<FieldSchema> partCols = mTable.getPartCols();
     List<String> bucketingCols = mTable.getBucketCols();
 
     rewrittenQueryStr.append("insert into table ");
-    rewrittenQueryStr.append(getDotName(new String[] {
-        HiveUtils.unparseIdentifier(tableName[0], this.conf),
-        HiveUtils.unparseIdentifier(tableName[1], this.conf) }));
+    rewrittenQueryStr.append(getFullTableNameForSQL(tabName));
 
-    // If the table is partitioned we have to put the partition() clause in
-    if (partCols != null && partCols.size() > 0) {
-      rewrittenQueryStr.append(" partition (");
-      boolean first = true;
-      for (FieldSchema fschema : partCols) {
-        if (first)
-          first = false;
-        else
-          rewrittenQueryStr.append(", ");
-        rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
-      }
-      rewrittenQueryStr.append(")");
-    }
+    addPartitionColsToInsert(partCols, rewrittenQueryStr);
 
     rewrittenQueryStr.append(" select ROW__ID");
+
     Map<Integer, ASTNode> setColExprs = null;
     Map<String, ASTNode> setCols = null;
     // Must be deterministic order set for consistent q-test output across Java versions
     Set<String> setRCols = new LinkedHashSet<String>();
     if (updating()) {
-      // An update needs to select all of the columns, as we rewrite the entire row.  Also,
-      // we need to figure out which columns we are going to replace.  We won't write the set
+      // We won't write the set
       // expressions in the rewritten query.  We'll patch that up later.
       // The set list from update should be the second child (index 1)
       assert children.size() >= 2 : "Expected update token to have at least two children";
       ASTNode setClause = (ASTNode)children.get(1);
-      assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
-          "Expected second child of update token to be set token";
-
-      // Get the children of the set clause, each of which should be a column assignment
-      List<? extends Node> assignments = setClause.getChildren();
-      // Must be deterministic order map for consistent q-test output across Java versions
-      setCols = new LinkedHashMap<String, ASTNode>(assignments.size());
-      setColExprs = new HashMap<Integer, ASTNode>(assignments.size());
-      for (Node a : assignments) {
-        ASTNode assignment = (ASTNode)a;
-        assert assignment.getToken().getType() == HiveParser.EQUAL :
-            "Expected set assignments to use equals operator but found " + assignment.getName();
-        ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
-        assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
-            "Expected left side of assignment to be table or column";
-        ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
-        assert colName.getToken().getType() == HiveParser.Identifier :
-            "Expected column name";
-
-        addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols);
-
-        String columnName = normalizeColName(colName.getText());
-
-        // Make sure this isn't one of the partitioning columns, that's not supported.
-        if (partCols != null) {
-          for (FieldSchema fschema : partCols) {
-            if (fschema.getName().equalsIgnoreCase(columnName)) {
-              throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
-            }
-          }
-        }
-        //updating bucket column should move row from one file to another - not supported
-        if(bucketingCols != null && bucketingCols.contains(columnName)) {
-          throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE,columnName);
-        }
-        // This means that in UPDATE T SET x = _something_
-        // _something_ can be whatever is supported in SELECT _something_
-        setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
-      }
+      setCols = collectSetColumnsAndExpressions(setClause, partCols, bucketingCols, setRCols);
+      setColExprs = new HashMap<Integer, ASTNode>(setClause.getChildCount());
 
       List<FieldSchema> nonPartCols = mTable.getCols();
       for (int i = 0; i < nonPartCols.size(); i++) {
@@ -237,17 +376,9 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
       }
     }
 
-    // If the table is partitioned, we need to select the partition columns as well.
-    if (partCols != null) {
-      for (FieldSchema fschema : partCols) {
-        rewrittenQueryStr.append(", ");
-        rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
-      }
-    }
+    addPartitionColsToSelect(partCols, rewrittenQueryStr, null);
     rewrittenQueryStr.append(" from ");
-    rewrittenQueryStr.append(getDotName(new String[] {
-        HiveUtils.unparseIdentifier(tableName[0], this.conf),
-        HiveUtils.unparseIdentifier(tableName[1], this.conf) }));
+    rewrittenQueryStr.append(getFullTableNameForSQL(tabName));
 
     ASTNode where = null;
     int whereIndex = deleting() ? 1 : 2;
@@ -260,35 +391,21 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
     // Add a sort by clause so that the row ids come out in the correct order
     rewrittenQueryStr.append(" sort by ROW__ID ");
 
-    // Parse the rewritten query string
-    Context rewrittenCtx;
-    try {
-      // Set dynamic partitioning to nonstrict so that queries do not need any partition
-      // references.
-      HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
-      rewrittenCtx = new Context(conf);
-      rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
-    } catch (IOException e) {
-      throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
-    }
-    rewrittenCtx.setCmd(rewrittenQueryStr.toString());
-    rewrittenCtx.setAcidOperation(ctx.getAcidOperation());
-
-    ParseDriver pd = new ParseDriver();
-    ASTNode rewrittenTree;
-    try {
-      LOG.info("Going to reparse " + operation() + " as <" + rewrittenQueryStr.toString() + ">");
-      rewrittenTree = pd.parse(rewrittenQueryStr.toString(), rewrittenCtx);
-      rewrittenTree = ParseUtils.findRootNonNullToken(rewrittenTree);
-
-    } catch (ParseException e) {
-      throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
-    }
+    ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+    Context rewrittenCtx = rr.rewrittenCtx;
+    ASTNode rewrittenTree = rr.rewrittenTree;
 
     ASTNode rewrittenInsert = (ASTNode)rewrittenTree.getChildren().get(1);
     assert rewrittenInsert.getToken().getType() == HiveParser.TOK_INSERT :
         "Expected TOK_INSERT as second child of TOK_QUERY but found " + rewrittenInsert.getName();
 
+    if(updating()) {
+      rewrittenCtx.addDestNamePrefix(rewrittenInsert, Context.DestClausePrefix.UPDATE);
+    }
+    else if(deleting()) {
+      rewrittenCtx.addDestNamePrefix(rewrittenInsert, Context.DestClausePrefix.DELETE);
+    }
+
     if (where != null) {
       // The structure of the AST for the rewritten insert statement is:
       // TOK_QUERY -> TOK_FROM
@@ -334,42 +451,38 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
       useSuper = false;
     }
 
-    // Walk through all our inputs and set them to note that this read is part of an update or a
-    // delete.
-    for (ReadEntity input : inputs) {
-      if(isWritten(input)) {
-        input.setUpdateOrDelete(true);
-      }
-    }
+    markReadEntityForUpdate();
 
     if (inputIsPartitioned(inputs)) {
+      //todo: there are bugs here: https://issues.apache.org/jira/browse/HIVE-15048
       // In order to avoid locking the entire write table we need to replace the single WriteEntity
       // with a WriteEntity for each partition
+      assert outputs.size() == 1 : "expected 1 WriteEntity. Got " + outputs;//this asserts comment above
+      WriteEntity original = null;
+      for(WriteEntity we : outputs) {
+        original = we;
+      }
       outputs.clear();
       for (ReadEntity input : inputs) {
+        /**
+         * The assumption here is that SemanticAnalyzer will will generate ReadEntity for each
+         * partition that exists and is matched by the WHERE clause (which may be all of them).
+         * Since we don't allow updating the value of a partition column, we know that we always
+         * write the same (or fewer) partitions than we read.  Still, the write is a Dynamic
+         * Partition write - see HIVE-15032.
+         */
         if (input.getTyp() == Entity.Type.PARTITION) {
           WriteEntity.WriteType writeType = deleting() ? WriteEntity.WriteType.DELETE :
               WriteEntity.WriteType.UPDATE;
-          outputs.add(new WriteEntity(input.getPartition(), writeType));
+          WriteEntity we = new WriteEntity(input.getPartition(), writeType);
+          we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
+          outputs.add(we);
         }
       }
-    } else {
-      // We still need to patch up the WriteEntities as they will have an insert type.  Change
-      // them to the appropriate type for our operation.
-      for (WriteEntity output : outputs) {
-        output.setWriteType(deleting() ? WriteEntity.WriteType.DELETE :
-            WriteEntity.WriteType.UPDATE);
-      }
     }
 
-    // For updates, we need to set the column access info so that it contains information on
-    // the columns we are updating.
     if (updating()) {
-      ColumnAccessInfo cai = new ColumnAccessInfo();
-      for (String colName : setCols.keySet()) {
-        cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
-      }
-      setUpdateColumnAccessInfo(cai);
+      setUpAccessControlInfoForUpdate(mTable, setCols);
 
       // Add the setRCols to the input list
       for (String colName : setRCols) {
@@ -379,14 +492,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
         }
       }
     }
-
-    // We need to weed ROW__ID out of the input column info, as it doesn't make any sense to
-    // require the user to have authorization on that column.
-    if (columnAccessInfo != null) {
-      columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID);
-    }
   }
-
   /**
    * Check that {@code readEntity} is also being written
    */
@@ -400,10 +506,11 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
     return false;
   }
   private String operation() {
-    if (updating()) return "update";
-    else if (deleting()) return "delete";
-    else throw new IllegalStateException("UpdateDeleteSemanticAnalyzer neither updating nor " +
-          "deleting, operation not known.");
+    if (currentOperation == Operation.NOT_ACID) {
+      throw new IllegalStateException("UpdateDeleteSemanticAnalyzer neither updating nor " +
+        "deleting, operation not known.");
+    }
+    return currentOperation.toString();
   }
 
   private boolean inputIsPartitioned(Set<ReadEntity> inputs) {
@@ -417,7 +524,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
     return false;
   }
 
-  // This method find any columns on the right side of a set statement (thus rcols) and puts them
+  // This method finds any columns on the right side of a set statement (thus rcols) and puts them
   // in a set so we can add them to the list of input cols to check.
   private void addSetRCols(ASTNode node, Set<String> setRCols) {
 
@@ -443,4 +550,566 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
   private static String normalizeColName(String colName) {
     return colName.toLowerCase();
   }
+
+  //todo: see SubQueryDiagnostic for some ideas on turning ASTNode into SQL
+  //todo: should we add MERGE to AcidUtils.Operation instead?  that will be a lot of code clean up
+  private enum Operation {UPDATE, DELETE, MERGE, NOT_ACID};
+  private Operation currentOperation = Operation.NOT_ACID;
+  private static final String Indent = "  ";
+
+  /**
+   * Here we take a Merge statement AST and generate a semantically equivalent multi-insert
+   * statement to exectue.  Each Insert leg represents a single WHEN clause.  As much as possible,
+   * the new SQL statement is made to look like the input SQL statement so that it's easier to map
+   * Query Compiler errors from generated SQL to original one this way.
+   * The generated SQL is a complete representation of the original input for the same reason.
+   * In many places SemanticAnalyzer throws exceptions that contain (line, position) coordinates.
+   * If generated SQL doesn't have everything and is patched up later, these coordinates point to
+   * the wrong place.
+   *
+   * @throws SemanticException
+   */
+  private void analyzeMerge(ASTNode tree) throws SemanticException {
+    currentOperation = Operation.MERGE;
+    /*
+     * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST
+      For example, given:
+      merge into acidTbl using nonAcidPart2 source ON acidTbl.a = source.a2
+      WHEN MATCHED THEN UPDATE set b = source.b2
+      WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)
+
+      We get AST like this:
+      "(tok_merge " +
+        "(tok_tabname acidtbl) (tok_tabref (tok_tabname nonacidpart2) source) " +
+        "(= (. (tok_table_or_col acidtbl) a) (. (tok_table_or_col source) a2)) " +
+        "(tok_matched " +
+        "(tok_update " +
+        "(tok_set_columns_clause (= (tok_table_or_col b) (. (tok_table_or_col source) b2))))) " +
+        "(tok_not_matched " +
+        "tok_insert " +
+        "(tok_value_row (. (tok_table_or_col source) a2) (. (tok_table_or_col source) b2))))");
+
+        And need to produce a multi-insert like this to execute:
+        FROM acidTbl right outer join nonAcidPart2 ON acidTbl.a = source.a2
+        Insert into table acidTbl select nonAcidPart2.a2, nonAcidPart2.b2 where acidTbl.a is null
+        INSERT INTO TABLE acidTbl select target.ROW__ID, nonAcidPart2.a2, nonAcidPart2.b2 where nonAcidPart2.a2=acidTbl.a sort by acidTbl.ROW__ID
+    */
+    /*todo: we need some sort of validation phase over original AST to make things user friendly; for example, if
+     original command refers to a column that doesn't exist, this will be caught when processing the rewritten query but
+     the errors will point at locations that the user can't map to anything
+     - VALUES clause must have the same number of values as target table (including partition cols).  Part cols go last in Select clause of Insert as Select
+     todo: do we care to preserve comments in original SQL?
+     todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent
+      Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse...
+     todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens hwen source is empty?  This should be a runtime error - maybe not
+      the outer side of ROJ is empty => the join produces 0 rows.  If supporting WHEN NOT MATCHED BY SOURCE, then this should be a runtime error
+    */
+    ASTNode target = (ASTNode)tree.getChild(0);
+    ASTNode source = (ASTNode)tree.getChild(1);
+    String targetName = getSimpleTableName(target);
+    String sourceName = getSimpleTableName(source);
+    ASTNode onClause = (ASTNode) tree.getChild(2);
+
+    Table targetTable = getTargetTable(target);
+    validateTargetTable(targetTable);
+    List<ASTNode> whenClauses = findWhenClauses(tree);
+
+    StringBuilder rewrittenQueryStr = new StringBuilder("FROM\n");
+    rewrittenQueryStr.append(Indent).append(getFullTableNameForSQL(target));
+    if(isAliased(target)) {
+      rewrittenQueryStr.append(" ").append(targetName);
+    }
+    rewrittenQueryStr.append('\n');
+    rewrittenQueryStr.append(Indent).append(chooseJoinType(whenClauses)).append("\n");
+    if(source.getType() == HiveParser.TOK_SUBQUERY) {
+      //this includes the mandatory alias
+      rewrittenQueryStr.append(Indent).append(source.getMatchedText());
+    }
+    else {
+      rewrittenQueryStr.append(Indent).append(getFullTableNameForSQL(source));
+      if(isAliased(source)) {
+        rewrittenQueryStr.append(" ").append(sourceName);
+      }
+    }
+    rewrittenQueryStr.append('\n');
+    rewrittenQueryStr.append(Indent).append("ON ").append(onClause.getMatchedText()).append('\n');
+
+    /**
+     * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete
+     * If we have both update and delete, the 1st one (in SQL code) must have "AND <extra predicate>"
+     * so that the 2nd can ensure not to process the same rows.
+     * Update and Delete may be in any order.  (Insert is always last)
+     */
+    String extraPredicate = null;
+    int numWhenMatchedUpdateClauses = 0, numWhenMatchedDeleteClauses = 0;
+    for(ASTNode whenClause : whenClauses) {
+      switch (getWhenClauseOperation(whenClause).getType()) {
+        case HiveParser.TOK_INSERT:
+          handleInsert(whenClause, rewrittenQueryStr, target, onClause, targetTable, targetName);
+          break;
+        case HiveParser.TOK_UPDATE:
+          numWhenMatchedUpdateClauses++;
+          String s = handleUpdate(whenClause, rewrittenQueryStr, target, onClause.getMatchedText(), targetTable, extraPredicate);
+          if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
+            extraPredicate = s;//i.e. it's the 1st WHEN MATCHED
+          }
+          break;
+        case HiveParser.TOK_DELETE:
+          numWhenMatchedDeleteClauses++;
+          String s1 = handleDelete(whenClause, rewrittenQueryStr, target, onClause.getMatchedText(), targetTable, extraPredicate);
+          if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
+            extraPredicate = s1;//i.e. it's the 1st WHEN MATCHED
+          }
+          break;
+        default:
+          throw new IllegalStateException("Unexpected WHEN clause type: " + whenClause.getType() +
+            addParseInfo(whenClause));
+      }
+      if(numWhenMatchedDeleteClauses > 1) {
+        throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_DELETE, ctx.getCmd());
+      }
+      if(numWhenMatchedUpdateClauses > 1) {
+        throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd());
+      }
+    }
+    if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) {
+      throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd());
+    }
+
+    ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+    Context rewrittenCtx = rr.rewrittenCtx;
+    ASTNode rewrittenTree = rr.rewrittenTree;
+
+    //set dest name mapping on new context
+    for(int insClauseIdx = 1, whenClauseIdx = 0; insClauseIdx < rewrittenTree.getChildCount(); insClauseIdx++, whenClauseIdx++) {
+      //we've added Insert clauses in order or WHEN items in whenClauses
+      ASTNode insertClause = (ASTNode) rewrittenTree.getChild(insClauseIdx);
+      switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) {
+        case HiveParser.TOK_INSERT:
+          rewrittenCtx.addDestNamePrefix(insertClause, Context.DestClausePrefix.INSERT);
+          break;
+        case HiveParser.TOK_UPDATE:
+          rewrittenCtx.addDestNamePrefix(insertClause, Context.DestClausePrefix.UPDATE);
+          break;
+        case HiveParser.TOK_DELETE:
+          rewrittenCtx.addDestNamePrefix(insertClause, Context.DestClausePrefix.DELETE);
+          break;
+        default:
+          assert false;
+      }
+    }
+    try {
+      useSuper = true;
+      super.analyze(rewrittenTree, rewrittenCtx);
+    } finally {
+      useSuper = false;
+    }
+
+    markReadEntityForUpdate();
+
+    if(targetTable.isPartitioned()) {
+      List<ReadEntity> partitionsRead = getRestrictedPartitionSet(targetTable);
+      if(!partitionsRead.isEmpty()) {
+        //if there is WriteEntity with WriteType=UPDATE/DELETE for target table, replace it with
+        //WriteEntity for each partition
+        List<WriteEntity> toRemove = new ArrayList<>();
+        for(WriteEntity we : outputs) {
+          WriteEntity.WriteType wt = we.getWriteType();
+          if(isTargetTable(we, targetTable) &&
+            (wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) {
+            toRemove.add(we);
+          }
+        }
+        outputs.removeAll(toRemove);
+        for(ReadEntity re : partitionsRead) {
+          for(WriteEntity original : toRemove) {
+            //since we may have both Update and Delete branches, Auth needs to know
+            WriteEntity we = new WriteEntity(re.getPartition(), original.getWriteType());
+            we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
+            outputs.add(we);
+          }
+        }
+      }
+    }
+  }
+  /**
+   * If the optimizer has determined that it only has to read some of the partitions of the
+   * target table to satisfy the query, then we know that the write side of update/delete
+   * (and update/delete parts of merge)
+   * can only write (at most) that set of partitions (since we currently don't allow updating
+   * partition (or bucket) columns).  So we want to replace the table level
+   * WriteEntity in the outputs with WriteEntity for each of these partitions
+   * ToDo: see if this should be moved to SemanticAnalyzer itself since it applies to any
+   * insert which does a select against the same table.  Then SemanticAnalyzer would also
+   * be able to not use DP for the Insert...
+   *
+   * Note that the Insert of Merge may be creating new partitions and writing to partitions
+   * which were not read  (WHEN NOT MATCHED...)
+   */
+  private List<ReadEntity> getRestrictedPartitionSet(Table targetTable) {
+    List<ReadEntity> partitionsRead = new ArrayList<>();
+    for(ReadEntity re : inputs) {
+      if(re.isFromTopLevelQuery && re.getType() == Entity.Type.PARTITION && isTargetTable(re, targetTable)) {
+        partitionsRead.add(re);
+      }
+    }
+    return partitionsRead;
+  }
+  /**
+   * if there is no WHEN NOT MATCHED THEN INSERT, we don't outer join
+   */
+  private String chooseJoinType(List<ASTNode> whenClauses) {
+    for(ASTNode whenClause : whenClauses) {
+      if(getWhenClauseOperation(whenClause).getType() == HiveParser.TOK_INSERT) {
+        return "RIGHT OUTER JOIN";
+      }
+    }
+    return "INNER JOIN";
+  }
+  /**
+   * does this Entity belong to target table (partition)
+   */
+  private boolean isTargetTable(Entity entity, Table targetTable) {
+    //todo: https://issues.apache.org/jira/browse/HIVE-15048
+    /**
+     * is this the right way to compare?  Should it just compare paths?
+     * equals() impl looks heavy weight
+     */
+    return targetTable.equals(entity.getTable());
+  }
+  /**
+   * @param onClauseAsString - because there is no clone() and we need to use in multiple places
+   * @param deleteExtraPredicate - see notes at caller
+   */
+  private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr,
+                              ASTNode target, String onClauseAsString, Table targetTable,
+                              String deleteExtraPredicate) throws SemanticException {
+    assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED;
+    assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE;
+    List<FieldSchema> partCols = targetTable.getPartCols();
+    List<String> bucketingCols = targetTable.getBucketCols();
+    String targetName = getSimpleTableName(target);
+    rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+    addPartitionColsToInsert(partCols, rewrittenQueryStr);
+    rewrittenQueryStr.append("\n select ").append(targetName).append(".ROW__ID");
+
+    ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0);
+    //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions
+    //before reparsing, i.e. they are known to SemanticAnalyzer logic
+    Map<String, ASTNode> setColsExprs = collectSetColumnsAndExpressions(setClause, partCols, bucketingCols, null);
+    //if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end up with
+    //insert into target (p1) select current_date(), 5, c3, p1 where ....
+    //since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table names
+    List<FieldSchema> nonPartCols = targetTable.getCols();
+    for(FieldSchema fs : nonPartCols) {
+      rewrittenQueryStr.append(", ");
+      String name = fs.getName();
+      if (setColsExprs.containsKey(name)) {
+        rewrittenQueryStr.append(setColsExprs.get(name).getMatchedText());
+      }
+      else {
+        //todo: is this the right way to get <table>.<colum> for target?
+        rewrittenQueryStr.append(getSimpleTableName(target)).append(".").append(HiveUtils.unparseIdentifier(name, this.conf));
+      }
+    }
+    addPartitionColsToSelect(partCols, rewrittenQueryStr, targetName);
+    rewrittenQueryStr.append("\n   WHERE ").append(onClauseAsString);
+    String extraPredicate = getWhenClausePredicate(whenMatchedUpdateClause);
+    if(extraPredicate != null) {
+      //we have WHEN MATCHED AND <boolean expr> THEN DELETE
+      rewrittenQueryStr.append(" AND ").append(extraPredicate);
+    }
+    if(deleteExtraPredicate != null) {
+      rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")");
+    }
+    rewrittenQueryStr.append("\n sort by ");
+    rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+
+    setUpAccessControlInfoForUpdate(targetTable, setColsExprs);
+    //we don't deal with columns on RHS of SET expression since the whole expr is part of the
+    //rewritten SQL statement and is thus handled by SemanticAnalzyer.  Nor do we have to
+    //figure which cols on RHS are from source and which from target
+
+    return extraPredicate;
+  }
+  /**
+   * @param onClauseAsString - because there is no clone() and we need to use in multiple places
+   * @param updateExtraPredicate - see notes at caller
+   */
+  private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target,
+                            String onClauseAsString, Table targetTable, String updateExtraPredicate) throws SemanticException {
+    assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED;
+    assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
+    List<FieldSchema> partCols = targetTable.getPartCols();
+    String targetName = getSimpleTableName(target);
+    rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+    addPartitionColsToInsert(partCols, rewrittenQueryStr);
+
+    rewrittenQueryStr.append("\n select ").append(targetName).append(".ROW__ID ");
+    addPartitionColsToSelect(partCols, rewrittenQueryStr, targetName);
+    rewrittenQueryStr.append("\n   WHERE ").append(onClauseAsString);
+    String extraPredicate = getWhenClausePredicate(whenMatchedDeleteClause);
+    if(extraPredicate != null) {
+      //we have WHEN MATCHED AND <boolean expr> THEN DELETE
+      rewrittenQueryStr.append(" AND ").append(extraPredicate);
+    }
+    if(updateExtraPredicate != null) {
+      rewrittenQueryStr.append(" AND NOT(").append(updateExtraPredicate).append(")");
+    }
+    rewrittenQueryStr.append("\n sort by ");
+    rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+    return extraPredicate;
+  }
+  private static String addParseInfo(ASTNode n) {
+    return " at " + ErrorMsg.renderPosition(n);
+  }
+
+  /**
+   * Returns the table name to use in the generated query preserving original quotes/escapes if any
+   * @see #getFullTableNameForSQL(ASTNode)
+   */
+  private String getSimpleTableName(ASTNode n) throws SemanticException {
+    return HiveUtils.unparseIdentifier(getSimpleTableNameBase(n), this.conf);
+  }
+  private String getSimpleTableNameBase(ASTNode n) throws SemanticException {
+    switch (n.getType()) {
+      case HiveParser.TOK_TABREF:
+        int aliasIndex = findTabRefIdxs(n)[0];
+        if (aliasIndex != 0) {
+          return n.getChild(aliasIndex).getText();//the alias
+        }
+        return getSimpleTableNameBase((ASTNode) n.getChild(0));
+        case HiveParser.TOK_TABNAME:
+        if(n.getChildCount() == 2) {
+          //db.table -> return table
+          return n.getChild(1).getText();
+        }
+        return n.getChild(0).getText();
+      case HiveParser.TOK_SUBQUERY:
+        return n.getChild(1).getText();//the alias
+      default:
+        throw raiseWrongType("TOK_TABREF|TOK_TABNAME|TOK_SUBQUERY", n);
+    }
+  }
+  /**
+   * @return table name in db.table form with proper quoting/escaping to be used in a SQL statement
+   */
+  private String getFullTableNameForSQL(ASTNode n) throws SemanticException {
+    switch (n.getType()) {
+      case HiveParser.TOK_TABNAME:
+        String[] tableName = getQualifiedTableName(n);
+        return getDotName(new String[] {
+          HiveUtils.unparseIdentifier(tableName[0], this.conf),
+          HiveUtils.unparseIdentifier(tableName[1], this.conf) });
+      case HiveParser.TOK_TABREF:
+        return getFullTableNameForSQL((ASTNode) n.getChild(0));
+      default:
+        throw raiseWrongType("TOK_TABNAME", n);
+    }
+  }  private static final class ReparseResult {
+    private final ASTNode rewrittenTree;
+    private final Context rewrittenCtx;
+    ReparseResult(ASTNode n, Context c) {
+      rewrittenTree = n;
+      rewrittenCtx = c;
+    }
+  }
+  private static IllegalArgumentException raiseWrongType(String expectedTokName, ASTNode n) {
+    return new IllegalArgumentException("Expected " + expectedTokName + "; got " + n.getType());
+  }
+  private boolean isAliased(ASTNode n) {
+    switch (n.getType()) {
+      case HiveParser.TOK_TABREF:
+        return findTabRefIdxs(n)[0] != 0;
+      case HiveParser.TOK_TABNAME:
+        return false;
+      case HiveParser.TOK_SUBQUERY:
+        assert n.getChildCount() > 1 : "Expected Derived Table to be aliased";
+        return true;
+      default:
+        throw raiseWrongType("TOK_TABREF|TOK_TABNAME", n);
+    }
+  }
+  /**
+   * Collect WHEN clauses from Merge statement AST
+   */
+  private List<ASTNode> findWhenClauses(ASTNode tree) throws SemanticException {
+    assert tree.getType() == HiveParser.TOK_MERGE;
+    List<ASTNode> whenClauses = new ArrayList<>();
+    for(int idx = 3; idx < tree.getChildCount(); idx++) {
+      ASTNode whenClause = (ASTNode)tree.getChild(idx);
+      assert whenClause.getType() == HiveParser.TOK_MATCHED ||
+        whenClause.getType() == HiveParser.TOK_NOT_MATCHED :
+        "Unexpected node type found: " + whenClause.getType() + addParseInfo(whenClause);
+      whenClauses.add(whenClause);
+    }
+    if(whenClauses.size() <= 0) {
+      //Futureproofing: the parser will actually not allow this
+      throw new SemanticException("Must have at least 1 WHEN clause in MERGE statement");
+    }
+    return whenClauses;
+  }
+  private ASTNode getWhenClauseOperation(ASTNode whenClause) {
+    if(!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
+      throw  raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
+    }
+    return (ASTNode) whenClause.getChild(0);
+  }
+  /**
+   * returns the <boolean predicate> as in WHEN MATCHED AND <boolean predicate> THEN...
+   * @return may be null
+   */
+  private String getWhenClausePredicate(ASTNode whenClause) {
+    if(!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
+      throw  raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
+    }
+    if(whenClause.getChildCount() == 2) {
+      return ((ASTNode)whenClause.getChild(1)).getMatchedText();
+    }
+    return null;
+  }
+  /**
+   * Generates the Insert leg of the multi-insert SQL to represent WHEN NOT MATCHED THEN INSERT clause
+   * @param targetTableNameInSourceQuery - simple name/alias
+   * @throws SemanticException
+   */
+  private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQueryStr, ASTNode target,
+                            ASTNode onClause, Table targetTable,
+                            String targetTableNameInSourceQuery) throws SemanticException{
+    assert whenNotMatchedClause.getType() == HiveParser.TOK_NOT_MATCHED;
+    assert getWhenClauseOperation(whenNotMatchedClause).getType() == HiveParser.TOK_INSERT;
+    List<FieldSchema> partCols = targetTable.getPartCols();
+
+    String valuesClause = ((ASTNode)getWhenClauseOperation(whenNotMatchedClause).getChild(0))
+      .getMatchedText();
+    valuesClause = valuesClause.substring(1, valuesClause.length() - 1);
+    rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+    addPartitionColsToInsert(partCols, rewrittenQueryStr);
+
+    OnClauseAnalyzer oca = new OnClauseAnalyzer(onClause, targetTable, targetTableNameInSourceQuery);
+    oca.analyze();
+    rewrittenQueryStr.append("\n  select ")
+      .append(valuesClause).append("\n   WHERE ").append(oca.getPredicate());
+    String extraPredicate = getWhenClausePredicate(whenNotMatchedClause);
+    if(extraPredicate != null) {
+      //we have WHEN NOT MATCHED AND <boolean expr> THEN INSERT
+      rewrittenQueryStr.append(" AND ")
+        .append(((ASTNode)whenNotMatchedClause.getChild(1)).getMatchedText()).append('\n');
+    }
+  }
+  /**
+   * Suppose the input Merge statement has ON target.a = source.b and c = d.  Assume, that 'c' is from
+   * target table and 'd' is from source expression.  In order to properly
+   * generate the Insert for WHEN NOT MATCHED THEN INSERT, we need to make sure that the Where
+   * clause of this Insert contains "target.a is null and target.c is null"  This ensures that this
+   * Insert leg does not receive any rows that are processed by Insert corresponding to
+   * WHEN MATCHED THEN ... clauses.  (Implicit in this is a mini resolver that figures out if an
+   * unqualified column is part of the target table.  We can get away with this simple logic because
+   * we know that target is always a table (as opposed to some derived table).
+   * The job of this class is to generate this predicate.
+   *
+   * Note that is thi predicate cannot simply be NOT(on-clause-expr).  IF on-clause-expr evaluates
+   * to Unknown, it will be treated as False in the WHEN MATCHED Inserts but NOT(Unknown) = Unknown,
+   * and so it will be False for WHEN NOT MATCHED Insert...
+   */
+  private static final class OnClauseAnalyzer {
+    private final ASTNode onClause;
+    private final Map<String, List<String>> table2column = new HashMap<>();
+    private final List<String> unresolvedColumns = new ArrayList<>();
+    private final List<FieldSchema> allTargetTableColumns = new ArrayList<>();
+    private final Set<String> tableNamesFound = new HashSet<>();
+    private final String targetTableNameInSourceQuery;
+    /**
+     * @param targetTableNameInSourceQuery alias or simple name
+     */
+    OnClauseAnalyzer(ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery) {
+      this.onClause = onClause;
+      allTargetTableColumns.addAll(targetTable.getCols());
+      allTargetTableColumns.addAll(targetTable.getPartCols());
+      this.targetTableNameInSourceQuery = unescapeIdentifier(targetTableNameInSourceQuery);
+    }
+    /**
+     * finds all columns and groups by table ref (if there is one)
+     */
+    private void visit(ASTNode n) {
+      if(n.getType() == HiveParser.TOK_TABLE_OR_COL) {
+        ASTNode parent = (ASTNode) n.getParent();
+        if(parent != null && parent.getType() == HiveParser.DOT) {
+          //the ref must be a table, so look for column name as right child of DOT
+          if(parent.getParent() != null && parent.getParent().getType() == HiveParser.DOT) {
+            //I don't think this can happen... but just in case
+            throw new IllegalArgumentException("Found unexpected db.table.col reference in " + onClause.getMatchedText());
+          }
+          addColumn2Table(n.getChild(0).getText(), parent.getChild(1).getText());
+        }
+        else {
+          //must be just a column name
+          unresolvedColumns.add(n.getChild(0).getText());
+        }
+      }
+      if(n.getChildCount() == 0) {
+        return;
+      }
+      for(Node child : n.getChildren()) {
+        visit((ASTNode)child);
+      }
+    }
+    private void analyze() {
+      visit(onClause);
+      int numTableRefs = tableNamesFound.size();
+      if(tableNamesFound.size() > 2) {
+        throw new IllegalArgumentException("Found > 2 table refs in ON clause.  Found " +
+          tableNamesFound + " in " + onClause.getMatchedText());
+      }
+      handleUnresolvedColumns();
+      if(tableNamesFound.size() > 2) {
+        throw new IllegalArgumentException("Found > 2 table refs in ON clause (incl unresolved).  " +
+          "Found " + tableNamesFound + " in " + onClause.getMatchedText());
+      }
+    }
+    /**
+     * Find those that belong to target table
+     */
+    private void handleUnresolvedColumns() {
+      if(unresolvedColumns.isEmpty()) { return; }
+      for(String c : unresolvedColumns) {
+        for(FieldSchema fs : allTargetTableColumns) {
+          if(c.equalsIgnoreCase(fs.getName())) {
+            //c belongs to target table; strictly speaking there maybe an ambiguous ref but
+            //this will be caught later when multi-insert is parsed
+            addColumn2Table(targetTableNameInSourceQuery.toLowerCase(), c);
+            break;
+          }
+        }
+      }
+    }
+    private void addColumn2Table(String tableName, String columnName) {
+      tableName = tableName.toLowerCase();//normalize name for mapping
+      tableNamesFound.add(tableName);
+      List<String> cols = table2column.get(tableName);
+      if(cols == null) {
+        cols = new ArrayList<>();
+        table2column.put(tableName, cols);
+      }
+      //we want to preserve 'columnName' as it was in original input query so that rewrite
+      //looks as much as possible like original query
+      cols.add(columnName);
+    }
+    /**
+     * Now generate the predicate for Where clause
+     */
+    private String getPredicate() {
+      //normilize table name for mapping
+      List<String> targetCols = table2column.get(targetTableNameInSourceQuery.toLowerCase());
+      StringBuilder sb = new StringBuilder();
+      for(String col : targetCols) {
+        if(sb.length() > 0) {
+          sb.append(" AND ");
+        }
+        //but preserve table name in SQL
+        sb.append(targetTableNameInSourceQuery).append(".").append(col).append(" IS NULL");
+      }
+      return sb.toString();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
index eafba21..60858e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
@@ -516,7 +516,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
           }
         }
         if (!found) {
-          throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
+          throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(" \'" + bucketCol + "\'"));
         }
       }
     }
@@ -536,7 +536,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
           }
         }
         if (!found) {
-          throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
+          throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(" \'" + sortCol + "\'"));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index f513d0f..f8ae86b 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -417,6 +417,7 @@ public class TestCompactionTxnHandler {
     long txnId = openTxns.getTxn_ids().get(0);
     // lock a table, as in dynamic partitions
     LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName);
+    lc.setIsDynamicPartitionWrite(true);
     lc.setTablename(tableName);
     DataOperationType dop = DataOperationType.UPDATE; 
     lc.setOperationType(dop);

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 64baa9f..68af15a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
@@ -580,4 +581,38 @@ public class TestTxnCommands {
     runStatementOnDriver("ALTER TABLE ex2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')");
     runStatementOnDriver("ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2");
   }
+  @Test
+  public void testMergeNegative() throws Exception {
+    CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO " + Table.ACIDTBL +
+      " target USING " + Table.NONACIDORCTBL +
+      " source\nON target.a = source.a " +
+      "\nWHEN MATCHED THEN UPDATE set b = 1 " +
+      "\nWHEN MATCHED THEN DELETE " +
+      "\nWHEN NOT MATCHED AND a < 1 THEN INSERT VALUES(1,2)");
+    Assert.assertEquals(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ((HiveException)cpr.getException()).getCanonicalErrorMsg());
+  }
+  @Test
+  public void testMergeNegative2() throws Exception {
+    CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO "+ Table.ACIDTBL +
+      " target USING " + Table.NONACIDORCTBL + "\n source ON target.pk = source.pk " +
+      "\nWHEN MATCHED THEN UPDATE set t = 1 " +
+      "\nWHEN MATCHED THEN UPDATE set b=a");
+    Assert.assertEquals(ErrorMsg.MERGE_TOO_MANY_UPDATE, ((HiveException)cpr.getException()).getCanonicalErrorMsg());
+  }
+  @Ignore
+  @Test
+  public void testSpecialChar() throws Exception {
+    String target = "`aci/d_u/ami`";
+    String src = "`src/name`";
+    runStatementOnDriver("drop table if exists " + target);
+    runStatementOnDriver("drop table if exists " + src);
+    runStatementOnDriver("create table " + target + "(i int," +
+      "`d?*de e` decimal(5,2)," +
+      "vc varchar(128)) clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("create table " + src + "(`g/h` int, j decimal(5,2), k varchar(128))");
+    runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h` " +
+      "\nwhen matched and i > 5 then delete " +
+      "\nwhen matched then update set vc=`\u2206\u220b` " +
+      "\nwhen not matched then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 949e071..49ba667 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -64,12 +64,15 @@ import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * TODO: this should be merged with TestTxnCommands once that is checked in
  * specifically the tests; the supporting code here is just a clone of TestTxnCommands
  */
 public class TestTxnCommands2 {
+  static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands2.class);
   protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
     File.separator + TestTxnCommands2.class.getCanonicalName()
     + "-" + System.currentTimeMillis()
@@ -86,7 +89,9 @@ public class TestTxnCommands2 {
     ACIDTBL("acidTbl"),
     ACIDTBLPART("acidTblPart"),
     NONACIDORCTBL("nonAcidOrcTbl"),
-    NONACIDPART("nonAcidPart");
+    NONACIDPART("nonAcidPart"),
+    NONACIDPART2("nonAcidPart2"),
+    ACIDNESTEDPART("acidNestedPart");
 
     private final String name;
     @Override
@@ -126,11 +131,17 @@ public class TestTxnCommands2 {
     }
     SessionState.start(new SessionState(hiveConf));
     d = new Driver(hiveConf);
+    d.setMaxRows(10000);
     dropTables();
     runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
     runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
     runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
     runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')");
+    runStatementOnDriver("create table " + Table.NONACIDPART2 +
+      "(a2 int, b2 int) partitioned by (p2 string) stored as orc TBLPROPERTIES ('transactional'='false')");
+    runStatementOnDriver("create table " + Table.ACIDNESTEDPART +
+      "(a int, b int) partitioned by (p int, q int) clustered by (a) into " + BUCKET_COUNT +
+      " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
   }
 
   protected void dropTables() throws Exception {
@@ -1333,6 +1344,409 @@ public class TestTxnCommands2 {
     String[] expectedResult = { "1\tfoo\tNULL", "2\tbar\tNULL" };
     Assert.assertEquals(Arrays.asList(expectedResult), rs);
   }
+  /**
+   * Test that ACID works with multi-insert statement
+   */
+  @Test
+  public void testMultiInsertStatement() throws Exception {
+    int[][] sourceValsOdd = {{5,5},{11,11}};
+    int[][] sourceValsEven = {{2,2}};
+    //populate source
+    runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(sourceValsOdd));
+    runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(sourceValsEven));
+    int[][] targetValsOdd = {{5,6},{7,8}};
+    int[][] targetValsEven = {{2,1},{4,3}};
+    //populate target
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " PARTITION(p='odd') " + makeValuesClause(targetValsOdd));
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " PARTITION(p='even') " + makeValuesClause(targetValsEven));
+    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " order by a,b");
+    int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
+    Assert.assertEquals(stringifyValues(targetVals), r);
+    //currently multi-insrt doesn't allow same table/partition in > 1 output branch
+    String s = "from " + Table.ACIDTBLPART + "  target right outer join " +
+      Table.NONACIDPART2 + " source on target.a = source.a2 " +
+      " INSERT INTO TABLE " + Table.ACIDTBLPART + " PARTITION(p='even') select source.a2, source.b2 where source.a2=target.a " +
+      " insert into table " + Table.ACIDTBLPART + " PARTITION(p='odd') select source.a2,source.b2 where target.a is null";
+    //r = runStatementOnDriver("explain formatted " + s);
+    //LOG.info("Explain formatted: " + r.toString());
+    runStatementOnDriver(s);
+    r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " where p='even' order by a,b");
+    int[][] rExpected = {{2,1},{2,2},{4,3},{5,5}};
+    Assert.assertEquals(stringifyValues(rExpected), r);
+    r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " where p='odd' order by a,b");
+    int[][] rExpected2 = {{5,6},{7,8},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected2), r);
+  }
+  /**
+   * check that we can specify insert columns
+   *
+   * Need to figure out semantics: what if a row from base expr ends up in both Update and Delete clauses we'll write
+   * Update event to 1 delta and Delete to another.  Given that we collapse events for same current txn for different stmt ids
+   * to the latest one, delete will win.
+   * In Acid 2.0 we'll end up with 2 Delete events for the same PK.  Logically should be OK, but may break Vectorized reader impl.... need to check
+   *
+   * 1:M from target to source results in ambiguous write to target - SQL Standard expects an error.  (I have an argument on how
+   * to solve this with minor mods to Join operator written down somewhere)
+   *
+   * Only need 1 Stats task for MERGE (currently we get 1 per branch).
+   * Should also eliminate Move task - that's a general ACID task
+   */
+  private void logResuts(List<String> r, String header, String prefix) {
+    LOG.info(prefix + " " + header);
+    StringBuilder sb = new StringBuilder();
+    int numLines = 0;
+    for(String line : r) {
+      numLines++;
+      sb.append(prefix).append(line).append("\n");
+    }
+    LOG.info(sb.toString());
+    LOG.info(prefix + " Printed " + numLines + " lines");
+  }
+
+
+  /**
+   * This tests that we handle non-trivial ON clause correctly
+   * @throws Exception
+   */
+  @Test
+  public void testMerge() throws Exception {
+    int[][] baseValsOdd = {{5,5},{11,11}};
+    runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+    int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(vals), r);
+    String query = "merge into " + Table.ACIDTBL + 
+      " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = a2 and b + 1 = source.b2 + 1 " +
+      "WHEN MATCHED THEN UPDATE set b = source.b2 " +
+      "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)";
+    runStatementOnDriver(query);
+
+    r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    int[][] rExpected = {{2,1},{4,3},{5,5},{5,6},{7,8},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected), r);
+  }
+  @Test
+  public void testMergeWithPredicate() throws Exception {
+    int[][] baseValsOdd = {{2,2},{5,5},{8,8},{11,11}};
+    runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+    int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(vals), r);
+    String query = "merge into " + Table.ACIDTBL +
+      " t using " + Table.NONACIDPART2 + " s ON t.a = s.a2 " +
+      "WHEN MATCHED AND t.b between 1 and 3 THEN UPDATE set b = s.b2 " +
+      "WHEN NOT MATCHED and s.b2 >= 11 THEN INSERT VALUES(s.a2, s.b2)";
+    runStatementOnDriver(query);
+
+    r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected), r);
+  }
+
+  /**
+   * Test combines update + insert clauses
+   * @throws Exception
+   */
+  @Test
+  public void testMerge2() throws Exception {
+    int[][] baseValsOdd = {{5,5},{11,11}};
+    int[][] baseValsEven = {{2,2},{4,44}};
+    runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+    runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven));
+    int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(vals), r);
+    String query = "merge into " + Table.ACIDTBL +
+      " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " +
+      "WHEN MATCHED THEN UPDATE set b = source.b2 " +
+      "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) ";//AND b < 1
+    r = runStatementOnDriver(query);
+    //r = runStatementOnDriver("explain  " + query);
+    //logResuts(r, "Explain logical1", "");
+
+    r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    int[][] rExpected = {{2,2},{4,44},{5,5},{7,8},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected), r);
+  }
+
+  /**
+   * test combines delete + insert clauses
+   * @throws Exception
+   */
+  @Test
+  public void testMerge3() throws Exception {
+    int[][] baseValsOdd = {{5,5},{11,11}};
+    int[][] baseValsEven = {{2,2},{4,44}};
+    runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+    runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven));
+    int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    Assert.assertEquals(stringifyValues(vals), r);
+    String query = "merge into " + Table.ACIDTBL +
+      " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " +
+      "WHEN MATCHED THEN DELETE " +
+      "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) ";
+    runStatementOnDriver(query);
+
+    r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    int[][] rExpected = {{7,8},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected), r);
+  }
+  /**
+   * https://hortonworks.jira.com/browse/BUG-66580
+   * @throws Exception
+   */
+  @Ignore
+  @Test
+  public void testMultiInsert() throws Exception {
+    runStatementOnDriver("create table if not exists  srcpart (a int, b int, c int) " +
+      "partitioned by (z int) clustered by (a) into 2 buckets " +
+      "stored as orc tblproperties('transactional'='true')");
+    runStatementOnDriver("create temporary table if not exists data1 (x int)");
+//    runStatementOnDriver("create temporary table if not exists data2 (x int)");
+
+    runStatementOnDriver("insert into data1 values (1),(2),(3)");
+//    runStatementOnDriver("insert into data2 values (4),(5),(6)");
+    d.destroy();
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    d = new Driver(hiveConf);
+    List<String> r = runStatementOnDriver(" from data1 " +
+      "insert into srcpart partition(z) select 0,0,1,x  " +
+      "insert into srcpart partition(z=1) select 0,0,1");
+  }
+  /**
+   * Investigating DP and WriteEntity, etc
+   * @throws Exception
+   */
+  @Test
+  @Ignore
+  public void testDynamicPartitions() throws Exception {
+    d.destroy();
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    //In DbTxnManager.acquireLocks() we have
+    // 1 ReadEntity: default@values__tmp__table__1
+    // 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')");
+    
+    List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
+    Assert.assertEquals("4", r1.get(0));
+    //In DbTxnManager.acquireLocks() we have
+    // 2 ReadEntity: [default@acidtblpart@p=p1, default@acidtblpart]
+    // 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false
+    //todo: side note on the above: LockRequestBuilder combines the both default@acidtblpart entries to 1
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) select * from " + Table.ACIDTBLPART + " where p='p1'");
+    
+    //In DbTxnManager.acquireLocks() we have
+    // 2 ReadEntity: [default@acidtblpart@p=p1, default@acidtblpart]
+    // 1 WriteEntity: default@acidtblpart@p=p2 Type=PARTITION WriteType=INSERT isDP=false
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='p2') select a,b from " + Table.ACIDTBLPART + " where p='p1'");
+    
+    //In UpdateDeleteSemanticAnalyzer, after super analyze
+    // 3 ReadEntity: [default@acidtblpart, default@acidtblpart@p=p1, default@acidtblpart@p=p2]
+    // 1 WriteEntity: [default@acidtblpart TABLE/INSERT]
+    //after UDSA
+    // Read [default@acidtblpart, default@acidtblpart@p=p1, default@acidtblpart@p=p2]
+    // Write [default@acidtblpart@p=p1, default@acidtblpart@p=p2] - PARTITION/UPDATE, PARTITION/UPDATE
+    //todo: Why acquire per partition locks - if you have many partitions that's hugely inefficient.
+    //could acquire 1 table level Shared_write intead
+    runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 1");
+    
+    //In UpdateDeleteSemanticAnalyzer, after super analyze
+    // Read [default@acidtblpart, default@acidtblpart@p=p1]
+    // Write default@acidtblpart TABLE/INSERT
+    //after UDSA
+    // Read [default@acidtblpart, default@acidtblpart@p=p1]
+    // Write [default@acidtblpart@p=p1] PARTITION/UPDATE
+    //todo: this causes a Read lock on the whole table - clearly overkill
+    //for Update/Delete we always write exactly (at most actually) the partitions we read
+    runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 1 where p='p1'");
+  }
+  @Test
+  public void testDynamicPartitionsMerge() throws Exception {
+    d.destroy();
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')");
+
+    List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
+    Assert.assertEquals("4", r1.get(0));
+    int[][] sourceVals = {{2,15},{4,44},{5,5},{11,11}};
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
+    runStatementOnDriver("merge into " + Table.ACIDTBLPART + " using " + Table.NONACIDORCTBL +
+      " as s ON " + Table.ACIDTBLPART + ".a = s.a " +
+      "when matched then update set b = s.b " +
+      "when not matched then insert values(s.a, s.b, 'new part')");
+    r1 = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
+    String result= r1.toString();
+    Assert.assertEquals("[new part\t5\t5, new part\t11\t11, p1\t1\t1, p1\t2\t15, p1\t3\t3, p2\t4\t44]", result);
+  }
+  /**
+   * Using nested partitions and thus DummyPartition
+   * @throws Exception
+   */
+  @Test
+  public void testDynamicPartitionsMerge2() throws Exception {
+    d.destroy();
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    int[][] targetVals = {{1,1,1},{2,2,2},{3,3,1},{4,4,2}};
+    runStatementOnDriver("insert into " + Table.ACIDNESTEDPART + " partition(p=1,q) " + makeValuesClause(targetVals));
+
+    List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDNESTEDPART);
+    Assert.assertEquals("4", r1.get(0));
+    int[][] sourceVals = {{2,15},{4,44},{5,5},{11,11}};
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
+    runStatementOnDriver("merge into " + Table.ACIDNESTEDPART + " using " + Table.NONACIDORCTBL +
+      " as s ON " + Table.ACIDNESTEDPART + ".a = s.a " +
+      "when matched then update set b = s.b " +
+      "when not matched then insert values(s.a, s.b, 3,4)");
+    r1 = runStatementOnDriver("select p,q,a,b from " + Table.ACIDNESTEDPART + " order by p,q, a, b");
+    Assert.assertEquals(stringifyValues(new int[][] {{1,1,1,1},{1,1,3,3},{1,2,2,15},{1,2,4,44},{3,4,5,5},{3,4,11,11}}), r1);
+  }
+  @Ignore("Covered elsewhere")
+  @Test
+  public void testMergeAliasedTarget() throws Exception {
+    int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));
+    int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+    String query = "merge into " + Table.ACIDTBL +
+      " as target using " + Table.NONACIDORCTBL + " source ON target.a = source.a " +
+      "WHEN MATCHED THEN update set b = 0 " +
+      "WHEN NOT MATCHED THEN INSERT VALUES(source.a, source.b) ";
+    runStatementOnDriver(query);
+
+    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    int[][] rExpected = {{2,0},{4,0},{5,0},{7,8},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected), r);
+  }
+  @Test
+  public void testMergeUpdateDelete() throws Exception {
+    int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));
+    int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+    String query = "merge into " + Table.ACIDTBL +
+      " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
+      "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
+      "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " +
+      "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+    runStatementOnDriver(query);
+
+    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    int[][] rExpected = {{2,0},{5,6},{7,8},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected), r);
+  }
+  @Test
+  public void testMergeDeleteUpdate() throws Exception {
+    int[][] sourceVals = {{2,2},{4,44},{5,5},{11,11}};
+    runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
+    int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
+    String query = "merge into " + Table.ACIDTBL +
+      " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
+      "WHEN MATCHED and s.a < 5 THEN DELETE " +
+      "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
+      "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+    runStatementOnDriver(query);
+
+    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    int[][] rExpected = {{5,6},{7,8},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected), r);
+  }
+
+  /**
+   * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy
+   */
+  @Test
+  public void testMergeType2SCD01() throws Exception {
+    runStatementOnDriver("drop table if exists target");
+    runStatementOnDriver("drop table if exists source");
+    runStatementOnDriver("drop table if exists splitTable");
+
+    runStatementOnDriver("create table splitTable(op int)");
+    runStatementOnDriver("insert into splitTable values (0),(1)");
+    runStatementOnDriver("create table source (key int, data int)");
+    runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}};
+    runStatementOnDriver("insert into target " + makeValuesClause(targetVals));
+    int[][] sourceVals = {{1, 7}, {3, 8}};
+    runStatementOnDriver("insert into source " + makeValuesClause(sourceVals));
+    //augment source with a col which has 1 if it will cause an update in target, 0 otherwise
+    String curMatch = "select s.*, case when t.cur is null then 0 else 1 end m from source s left outer join (select * from target where target.cur=1) t on s.key=t.key";
+    //split each row (duplicate) which will cause an update into 2 rows and augment with 'op' col which has 0 to insert, 1 to update
+    String teeCurMatch = "select curMatch.*, case when splitTable.op is null or splitTable.op = 0 then 0 else 1 end op from (" + curMatch + ") curMatch left outer join splitTable on curMatch.m=1";
+    if(false) {
+      //this is just for debug
+      List<String> r1 = runStatementOnDriver(curMatch);
+      List<String> r2 = runStatementOnDriver(teeCurMatch);
+    }
+    String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.op=1 " +
+      "when matched then update set cur=0 " +
+      "when not matched then insert values(s.key,s.data,1)";
+
+    runStatementOnDriver(stmt);
+    int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
+    List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
+    Assert.assertEquals(stringifyValues(resultVals), r);
+  }
+  /**
+   * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy
+   * Same as testMergeType2SCD01 but with a more intuitive "source" expression
+   */
+  @Test
+  public void testMergeType2SCD02() throws Exception {
+    runStatementOnDriver("drop table if exists target");
+    runStatementOnDriver("drop table if exists source");
+    runStatementOnDriver("create table source (key int, data int)");
+    runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+    int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}};
+    runStatementOnDriver("insert into target " + makeValuesClause(targetVals));
+    int[][] sourceVals = {{1, 7}, {3, 8}};
+    runStatementOnDriver("insert into source " + makeValuesClause(sourceVals));
+
+    String baseSrc =  "select source.*, 0 c from source " +
+    "union all " +
+      "select source.*, 1 c from source " +
+      "inner join target " +
+      "on source.key=target.key where target.cur=1";
+    if(false) {
+      //this is just for debug
+      List<String> r1 = runStatementOnDriver(baseSrc);
+      List<String> r2 = runStatementOnDriver(
+        "select t.*, s.* from target t right outer join (" + baseSrc + ") s " +
+          "\non t.key=s.key and t.cur=s.c and t.cur=1");
+    }
+    String stmt = "merge into target t using " +
+      "(" + baseSrc + ") s " +
+      "on t.key=s.key and t.cur=s.c and t.cur=1 " +
+      "when matched then update set cur=0 " +
+      "when not matched then insert values(s.key,s.data,1)";
+
+    runStatementOnDriver(stmt);
+    int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
+    List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
+    Assert.assertEquals(stringifyValues(resultVals), r);
+  }
+
+  @Test
+  @Ignore("Values clause with table constructor not yet supported")
+  public void testValuesSource() throws Exception {
+    int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
+    String query = "merge into " + Table.ACIDTBL +
+      " as t using (select * from (values (2,2),(4,44),(5,5),(11,11)) as F(a,b)) s ON t.a = s.a " +
+      "WHEN MATCHED and s.a < 5 THEN DELETE " +
+      "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
+      "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+    runStatementOnDriver(query);
+
+    List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+    int[][] rExpected = {{5,6},{7,8},{11,11}};
+    Assert.assertEquals(stringifyValues(rExpected), r);
+  }
 
   /**
    * takes raw data and turns it into a string as if from Driver.getResults()
@@ -1389,6 +1803,7 @@ public class TestTxnCommands2 {
   }
 
   protected List<String> runStatementOnDriver(String stmt) throws Exception {
+    LOG.info("+runStatementOnDriver(" + stmt + ")");
     CommandProcessorResponse cpr = d.run(stmt);
     if(cpr.getResponseCode() != 0) {
       throw new RuntimeException(stmt + " failed: " + cpr);

http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
index c2330cb..c4dead8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
@@ -18,21 +18,16 @@
 
 package org.apache.hadoop.hive.ql;
 
-import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -545,4 +540,10 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
     resultCount = 2;
     Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
   }
+  @Test
+  @Ignore
+  public void testMergeType2SCD01() throws Exception {}
+  @Test
+  @Ignore
+  public void testMergeType2SCD02() throws Exception {}
 }