You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/11/12 20:20:19 UTC
[1/3] hive git commit: HIVE-14943 Base Implementation (of HIVE-10924)
(Eugene Koifman, reviewed by Alan Gates)
Repository: hive
Updated Branches:
refs/heads/master 52ba014fc -> e00f909dd
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 4e1122d..637a01a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -45,6 +46,7 @@ import org.junit.Ignore;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -99,6 +101,7 @@ public class TestDbTxnManager2 {
}
@Test
public void testLocksInSubquery() throws Exception {
+ dropTable(new String[] {"T","S", "R"});
checkCmdOnDriver(driver.run("create table if not exists T (a int, b int)"));
checkCmdOnDriver(driver.run("create table if not exists S (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
checkCmdOnDriver(driver.run("create table if not exists R (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
@@ -132,6 +135,7 @@ public class TestDbTxnManager2 {
}
@Test
public void createTable() throws Exception {
+ dropTable(new String[] {"T"});
CommandProcessorResponse cpr = driver.compileAndRespond("create table if not exists T (a int, b int)");
checkCmdOnDriver(cpr);
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer");
@@ -143,6 +147,7 @@ public class TestDbTxnManager2 {
}
@Test
public void insertOverwriteCreate() throws Exception {
+ dropTable(new String[] {"T2", "T3"});
CommandProcessorResponse cpr = driver.run("create table if not exists T2(a int)");
checkCmdOnDriver(cpr);
cpr = driver.run("create table if not exists T3(a int)");
@@ -163,6 +168,7 @@ public class TestDbTxnManager2 {
}
@Test
public void insertOverwritePartitionedCreate() throws Exception {
+ dropTable(new String[] {"T4"});
CommandProcessorResponse cpr = driver.run("create table if not exists T4 (name string, gpa double) partitioned by (age int)");
checkCmdOnDriver(cpr);
cpr = driver.run("create table if not exists T5(name string, age int, gpa double)");
@@ -183,6 +189,7 @@ public class TestDbTxnManager2 {
}
@Test
public void basicBlocking() throws Exception {
+ dropTable(new String[] {"T6"});
CommandProcessorResponse cpr = driver.run("create table if not exists T6(a int)");
checkCmdOnDriver(cpr);
cpr = driver.compileAndRespond("select a from T6");
@@ -213,6 +220,7 @@ public class TestDbTxnManager2 {
}
@Test
public void lockConflictDbTable() throws Exception {
+ dropTable(new String[] {"temp.T7"});
CommandProcessorResponse cpr = driver.run("create database if not exists temp");
checkCmdOnDriver(cpr);
cpr = driver.run("create table if not exists temp.T7(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
@@ -240,6 +248,7 @@ public class TestDbTxnManager2 {
}
@Test
public void updateSelectUpdate() throws Exception {
+ dropTable(new String[] {"T8"});
CommandProcessorResponse cpr = driver.run("create table T8(a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
cpr = driver.compileAndRespond("delete from T8 where b = 89");
@@ -273,6 +282,7 @@ public class TestDbTxnManager2 {
@Test
public void testLockRetryLimit() throws Exception {
+ dropTable(new String[] {"T9"});
conf.setIntVar(HiveConf.ConfVars.HIVE_LOCK_NUMRETRIES, 2);
conf.setBoolVar(HiveConf.ConfVars.TXN_MGR_DUMP_LOCK_STATE_ON_ACQUIRE_TIMEOUT, true);
HiveTxnManager otherTxnMgr = new DbTxnManager();
@@ -309,6 +319,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void testLockBlockedBy() throws Exception {
+ dropTable(new String[] {"TAB_BLOCKED"});
CommandProcessorResponse cpr = driver.run("create table TAB_BLOCKED (a int, b int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
cpr = driver.compileAndRespond("select * from TAB_BLOCKED");
@@ -330,6 +341,7 @@ public class TestDbTxnManager2 {
@Test
public void testDummyTxnManagerOnAcidTable() throws Exception {
+ dropTable(new String[] {"T10", "T11"});
// Create an ACID table with DbTxnManager
CommandProcessorResponse cpr = driver.run("create table T10 (a int, b int) clustered by(b) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
@@ -389,6 +401,8 @@ public class TestDbTxnManager2 {
*/
@Test
public void testMetastoreTablesCleanup() throws Exception {
+ dropTable(new String[] {"temp.T10", "temp.T11", "temp.T12p", "temp.T13p"});
+
CommandProcessorResponse cpr = driver.run("create database if not exists temp");
checkCmdOnDriver(cpr);
@@ -569,6 +583,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void checkExpectedLocks() throws Exception {
+ dropTable(new String[] {"acidPart", "nonAcidPart"});
CommandProcessorResponse cpr = null;
cpr = driver.run("create table acidPart(a int, b int) partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
@@ -640,8 +655,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void checkExpectedLocks2() throws Exception {
- checkCmdOnDriver(driver.run("drop table if exists tab_acid"));
- checkCmdOnDriver(driver.run("drop table if exists tab_not_acid"));
+ dropTable(new String[] {"tab_acid", "tab_not_acid"});
checkCmdOnDriver(driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " +
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
checkCmdOnDriver(driver.run("create table if not exists tab_not_acid (na int, nb int) partitioned by (np string) " +
@@ -676,7 +690,7 @@ public class TestDbTxnManager2 {
}
/** The list is small, and the object is generated, so we don't use sets/equals/etc. */
- public static void checkLock(LockType expectedType, LockState expectedState, String expectedDb,
+ public static ShowLocksResponseElement checkLock(LockType expectedType, LockState expectedState, String expectedDb,
String expectedTable, String expectedPartition, List<ShowLocksResponseElement> actuals) {
for (ShowLocksResponseElement actual : actuals) {
if (expectedType == actual.getType() && expectedState == actual.getState()
@@ -684,11 +698,12 @@ public class TestDbTxnManager2 {
&& StringUtils.equals(normalizeCase(expectedTable), normalizeCase(actual.getTablename()))
&& StringUtils.equals(
normalizeCase(expectedPartition), normalizeCase(actual.getPartname()))) {
- return;
+ return actual;
}
}
Assert.fail("Could't find {" + expectedType + ", " + expectedState + ", " + expectedDb
+ ", " + expectedTable + ", " + expectedPartition + "} in " + actuals);
+ throw new IllegalStateException("How did it get here?!");
}
@Test
@@ -878,6 +893,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void testWriteSetTracking1() throws Exception {
+ dropTable(new String[] {"TAB_PART"});
CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
"partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
@@ -893,11 +909,17 @@ public class TestDbTxnManager2 {
txnMgr2.acquireLocks(driver.getPlan(), ctx, "Nicholas");
txnMgr2.commitTxn();
}
+ private void dropTable(String[] tabs) throws Exception {
+ for(String tab : tabs) {
+ driver.run("drop table if exists " + tab);
+ }
+ }
/**
* txns overlap in time but do not update same resource - no conflict
*/
@Test
public void testWriteSetTracking2() throws Exception {
+ dropTable(new String[] {"TAB_PART", "TAB2"});
CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
"partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
@@ -919,33 +941,42 @@ public class TestDbTxnManager2 {
txnMgr2.acquireLocks(driver.getPlan(), ctx, "Catherine");
txnMgr2.commitTxn();
}
-
/**
* txns overlap and update the same resource - can't commit 2nd txn
*/
@Test
public void testWriteSetTracking3() throws Exception {
+ dropTable(new String[] {"TAB_PART"});
CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
"partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)"));
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
- txnMgr.openTxn("Known");
- txnMgr2.openTxn("Unknown");
+ long txnId = txnMgr.openTxn("Known");
+ long txnId2 = txnMgr2.openTxn("Unknown");
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
List<ShowLocksResponseElement> locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 1, locks.size());
- checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks);
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
Assert.assertEquals("Unexpected lock count", 2, locks.size());
- checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks);
- checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", "p=blah", locks);
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnId, "default", "TAB_PART",
+ Collections.singletonList("p=blah"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
txnMgr.commitTxn();
+
+ adp.setTxnid(txnId2);
+ txnHandler.addDynamicPartitions(adp);
LockException expectedException = null;
try {
+ //with HIVE-15032 this should use static parts and thus not need addDynamicPartitions
txnMgr2.commitTxn();
}
catch (LockException e) {
@@ -953,8 +984,8 @@ public class TestDbTxnManager2 {
}
Assert.assertTrue("Didn't get exception", expectedException != null);
Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
- Assert.assertEquals("Exception msg didn't match",
- "Aborting [txnid:2,2] due to a write conflict on default/tab_part committed by [txnid:1,2]",
+ Assert.assertEquals("Exception msg didn't match",
+ "Aborting [txnid:3,3] due to a write conflict on default/TAB_PART/p=blah committed by [txnid:2,3] u/u",
expectedException.getCause().getMessage());
}
/**
@@ -963,6 +994,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void testWriteSetTracking4() throws Exception {
+ dropTable(new String[] {"TAB_PART", "TAB2"});
Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
"partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
@@ -1040,26 +1072,33 @@ public class TestDbTxnManager2 {
*/
@Test
public void testWriteSetTracking5() throws Exception {
+ dropTable(new String[] {"TAB_PART"});
Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
CommandProcessorResponse cpr = driver.run("create table if not exists TAB_PART (a int, b int) " +
"partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
+ checkCmdOnDriver(driver.run("insert into TAB_PART partition(p='blah') values(1,2)"));
HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
txnMgr.openTxn("Known");
- txnMgr2.openTxn("Unknown");
+ long txnId = txnMgr2.openTxn("Unknown");
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
txnMgr.acquireLocks(driver.getPlan(), ctx, "Known");
List<ShowLocksResponseElement> locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 1, locks.size());
- checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks);
checkCmdOnDriver(driver.compileAndRespond("update TAB_PART set b = 7 where p = 'blah'"));
((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Unknown", false);
locks = getLocks(txnMgr2);//should not matter which txnMgr is used here
Assert.assertEquals("Unexpected lock count", 2, locks.size());
- checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", null, locks);
- checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "TAB_PART", "p=blah", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", "p=blah", locks);
txnMgr.rollbackTxn();
+
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnId, "default", "TAB_PART",
+ Arrays.asList("p=blah"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
txnMgr2.commitTxn();//since conflicting txn rolled back, commit succeeds
Assert.assertEquals(1, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
@@ -1069,6 +1108,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void testWriteSetTracking6() throws Exception {
+ dropTable(new String[] {"TAB2"});
Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
CommandProcessorResponse cpr = driver.run("create table if not exists TAB2(a int, b int) clustered " +
"by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
@@ -1102,6 +1142,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void testWriteSetTracking7() throws Exception {
+ dropTable(new String[] {"tab2", "TAB2"});
Assert.assertEquals(0, TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET"));
CommandProcessorResponse cpr = driver.run("create table if not exists tab2 (a int, b int) " +
"partitioned by (p string) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
@@ -1209,6 +1250,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void testWriteSetTracking8() throws Exception {
+ dropTable(new String[] {"tab1", "TAB1"});
CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
@@ -1262,6 +1304,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void testWriteSetTracking9() throws Exception {
+ dropTable(new String[] {"TAB1"});
CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
@@ -1321,6 +1364,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void testWriteSetTracking10() throws Exception {
+ dropTable(new String[] {"TAB1"});
CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
@@ -1369,7 +1413,7 @@ public class TestDbTxnManager2 {
}
Assert.assertNotEquals("Expected exception", null, exception);
Assert.assertEquals("Exception msg doesn't match",
- "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3]",
+ "Aborting [txnid:3,3] due to a write conflict on default/tab1/p=two committed by [txnid:2,3] d/u",
exception.getCause().getMessage());
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString("select * from WRITE_SET"),
@@ -1382,6 +1426,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void testWriteSetTracking11() throws Exception {
+ dropTable(new String[] {"TAB1"});
CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
@@ -1442,6 +1487,7 @@ public class TestDbTxnManager2 {
}
@Test
public void testCompletedTxnComponents() throws Exception {
+ dropTable(new String[] {"TAB1", "tab_not_acid2"});
CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
"clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
checkCmdOnDriver(cpr);
@@ -1462,6 +1508,7 @@ public class TestDbTxnManager2 {
*/
@Test
public void testMultiInsert() throws Exception {
+ dropTable(new String[] {"TAB1", "tab_not_acid"});
checkCmdOnDriver(driver.run("drop table if exists tab1"));
checkCmdOnDriver(driver.run("drop table if exists tab_not_acid"));
CommandProcessorResponse cpr = driver.run("create table if not exists tab1 (a int, b int) partitioned by (p string) " +
@@ -1518,4 +1565,568 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "XYZ", null, locks);
Assert.assertEquals("Wrong AgentInfo", driver.getPlan().getQueryId(), locks.get(0).getAgentInfo());
}
+ @Test
+ public void testMerge3Way01() throws Exception {
+ testMerge3Way(false);
+ }
+ @Test
+ public void testMerge3Way02() throws Exception {
+ testMerge3Way(true);
+ }
+
+ /**
+ * @param cc whether to cause a WW conflict or not
+ * @throws Exception
+ */
+ private void testMerge3Way(boolean cc) throws Exception {
+ dropTable(new String[] {"target","source", "source2"});
+ checkCmdOnDriver(driver.run("create table target (a int, b int) " +
+ "partitioned by (p int, q int) clustered by (a) into 2 buckets " +
+ "stored as orc TBLPROPERTIES ('transactional'='true')"));
+ //in practice we don't really care about the data in any of these tables (except as far as
+ //it creates partitions, the SQL being test is not actually executed and results of the
+ //wrt ACID metadata is supplied manually via addDynamicPartitions(). But having data makes
+ //it easier to follow the intent
+ checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)"));
+ checkCmdOnDriver(driver.run("create table source (a int, b int, p int, q int)"));
+ checkCmdOnDriver(driver.run("insert into source values " +
+ // I-(1/2) D-(1/2) I-(1/3) U-(1/3) D-(2/2) I-(1/1) - new part
+ "(9,10,1,2), (3,4,1,2), (11,12,1,3), (5,13,1,3), (7,8,2,2), (14,15,1,1)"));
+ checkCmdOnDriver(driver.run("create table source2 (a int, b int, p int, q int)"));
+ checkCmdOnDriver(driver.run("insert into source2 values " +
+ //cc ? -:U-(1/2) D-(1/2) cc ? U-(1/3):- D-(2/2) I-(1/1) - new part 2
+ "(9,100,1,2), (3,4,1,2), (5,13,1,3), (7,8,2,2), (14,15,2,1)"));
+
+
+ long txnId1 = txnMgr.openTxn("T1");
+ checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s on t.a=s.b " +
+ "when matched and t.a=5 then update set b=s.b " + //updates p=1/q=3
+ "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
+ "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
+ Assert.assertEquals("Unexpected lock count", 5, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks);
+
+ //start concurrent txn
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ long txnId2 = txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("merge into target t using source2 s on t.a=s.b " +
+ "when matched and t.a=" + (cc ? 5 : 9) + " then update set b=s.b " + //if conflict updates p=1/q=3 else update p=1/q=2
+ "when matched and t.a in (3,7) then delete " + //deletes from p=1/q=2, p=2/q=2
+ "when not matched and t.a >= 8 then insert values(s.a, s.b, s.p, s.q)"));//insert p=1/q=2, p=1/q=3 and new part 1/1
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T1", false);
+ locks = getLocks(txnMgr2, true);
+ Assert.assertEquals("Unexpected lock count", 10, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks);
+
+ long extLockId = checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "target", null, locks).getLockid();
+ checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source2", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=2", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=3", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=2/q=2", locks);
+
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 0,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1));
+ //complete 1st txn
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnId1, "default", "target",
+ Collections.singletonList("p=1/q=3"));//update clause
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
+ adp = new AddDynamicPartitions(txnId1, "default", "target",
+ Arrays.asList("p=1/q=2","p=2/q=2"));//delete clause
+ adp.setOperationType(DataOperationType.DELETE);
+ txnHandler.addDynamicPartitions(adp);
+ adp = new AddDynamicPartitions(txnId1, "default", "target",
+ Arrays.asList("p=1/q=2","p=1/q=3","p=1/q=1"));//insert clause
+ adp.setOperationType(DataOperationType.INSERT);
+ txnHandler.addDynamicPartitions(adp);
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 1,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 +
+ " and tc_operation_type='u'"));
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 2,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 +
+ " and tc_operation_type='d'"));
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 3,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 +
+ " and tc_operation_type='i'"));
+ txnMgr.commitTxn();//commit T1
+ Assert.assertEquals(
+ "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+ TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 6,
+ TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId1));
+ Assert.assertEquals(
+ "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+ TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1,
+ TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId1 +
+ " and ws_operation_type='u'"));
+ Assert.assertEquals(
+ "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+ TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 2,
+ TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId1 +
+ " and ws_operation_type='d'"));
+
+ //re-check locks which were in Waiting state - should now be Acquired
+ ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId);
+ locks = getLocks(txnMgr2, true);
+ Assert.assertEquals("Unexpected lock count", 5, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source2", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks);
+
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 0,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2));
+ //complete 2nd txn
+ adp = new AddDynamicPartitions(txnId2, "default", "target",
+ Collections.singletonList(cc ? "p=1/q=3" : "p=1/p=2"));//update clause
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
+ adp = new AddDynamicPartitions(txnId2, "default", "target",
+ Arrays.asList("p=1/q=2","p=2/q=2"));//delete clause
+ adp.setOperationType(DataOperationType.DELETE);
+ txnHandler.addDynamicPartitions(adp);
+ adp = new AddDynamicPartitions(txnId2, "default", "target",
+ Arrays.asList("p=1/q=2","p=1/q=3","p=1/q=1"));//insert clause
+ adp.setOperationType(DataOperationType.INSERT);
+ txnHandler.addDynamicPartitions(adp);
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 1,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 +
+ " and tc_operation_type='u'"));
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 2,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 +
+ " and tc_operation_type='d'"));
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 3,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId2 +
+ " and tc_operation_type='i'"));
+
+ LockException expectedException = null;
+ try {
+ txnMgr2.commitTxn();//commit T2
+ }
+ catch (LockException e) {
+ expectedException = e;
+ }
+ if(cc) {
+ Assert.assertNotNull("didn't get exception", expectedException);
+ Assert.assertEquals("Transaction manager has aborted the transaction txnid:3. Reason: " +
+ "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=3 " +
+ "committed by [txnid:2,3] u/u", expectedException.getMessage());
+ Assert.assertEquals(
+ "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+ TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 0,
+ TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2));
+ Assert.assertEquals(
+ "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+ TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 0,
+ TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId2));
+ }
+ else {
+ Assert.assertNull("Unexpected exception " + expectedException, expectedException);
+ Assert.assertEquals(
+ "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+ TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 6,
+ TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + txnId2));
+ Assert.assertEquals(
+ "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+ TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1,
+ TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId2 +
+ " and ws_operation_type='u'"));
+ Assert.assertEquals(
+ "WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId2) + "): " +
+ TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 2,
+ TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId2 +
+ " and ws_operation_type='d'"));
+ }
+
+
+ }
+ @Test
+ public void testMergeUnpartitioned01() throws Exception {
+ testMergeUnpartitioned(true);
+ }
+ @Test
+ public void testMergeUnpartitioned02() throws Exception {
+ testMergeUnpartitioned(false);
+ }
+
+ /**
+ * run a merge statement using un-partitioned target table and a concurrent op on the target
+ * Check that proper locks are acquired and Write conflict detection works and the state
+ * of internal table.
+ * @param causeConflict true to make 2 operations such that they update the same entity
+ * @throws Exception
+ */
+ private void testMergeUnpartitioned(boolean causeConflict) throws Exception {
+ dropTable(new String[] {"target","source"});
+ checkCmdOnDriver(driver.run("create table target (a int, b int) " +
+ "clustered by (a) into 2 buckets " +
+ "stored as orc TBLPROPERTIES ('transactional'='true')"));
+ checkCmdOnDriver(driver.run("insert into target values (1,2), (3,4), (5,6), (7,8)"));
+ checkCmdOnDriver(driver.run("create table source (a int, b int)"));
+
+ long txnid1 = txnMgr.openTxn("T1");
+ if(causeConflict) {
+ checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where a=1"));
+ }
+ else {
+ checkCmdOnDriver(driver.compileAndRespond("insert into target values(9,10),(11,12)"));
+ }
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 1,//no DP, so it's populated from lock info
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid1));
+
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(causeConflict ? LockType.SHARED_WRITE : LockType.SHARED_READ,
+ LockState.ACQUIRED, "default", "target", null, locks);
+
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ //start a 2nd (overlapping) txn
+ long txnid2 = txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("merge into target t using source s " +
+ "on t.a=s.a " +
+ "when matched then delete " +
+ "when not matched then insert values(s.a,s.b)"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false);
+ locks = getLocks(txnMgr, true);
+
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks);
+ checkLock(LockType.SHARED_READ, causeConflict ? LockState.WAITING : LockState.ACQUIRED,
+ "default", "source", null, locks);
+ long extLockId = checkLock(LockType.SHARED_WRITE, causeConflict ? LockState.WAITING : LockState.ACQUIRED,
+ "default", "target", null, locks).getLockid();
+
+ txnMgr.commitTxn();//commit T1
+
+ Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " +
+ TxnDbUtil.queryToString("select * from WRITE_SET"),
+ causeConflict ? 1 : 0,//Inserts are not tracked by WRITE_SET
+ TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid1 +
+ " and ws_operation_type=" + (causeConflict ? "'u'" : "'i'")));
+
+
+ //re-check locks which were in Waiting state - should now be Acquired
+ ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId);
+ locks = getLocks(txnMgr, true);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", null, locks);
+
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 1,//
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2));
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 1,//
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 +
+ "and tc_operation_type='d'"));
+
+ //complete T2 txn
+ LockException expectedException = null;
+ try {
+ txnMgr2.commitTxn();
+ }
+ catch (LockException e) {
+ expectedException = e;
+ }
+ if(causeConflict) {
+ Assert.assertTrue("Didn't get exception", expectedException != null);
+ Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
+ Assert.assertEquals("Exception msg didn't match",
+ "Aborting [txnid:3,3] due to a write conflict on default/target committed by [txnid:2,3] d/u",
+ expectedException.getCause().getMessage());
+ }
+ else {
+ Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " +
+ TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1,//Unpartitioned table: 1 row for Delete; Inserts are not tracked in WRITE_SET
+ TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2 +
+ " and ws_operation_type='d'"));
+ }
+ }
+ /**
+ * Check that DP with partial spec properly updates TXN_COMPONENTS
+ * @throws Exception
+ */
+ @Test
+ public void testDynamicPartitionInsert() throws Exception {
+ dropTable(new String[] {"target"});
+ checkCmdOnDriver(driver.run("create table target (a int, b int) " +
+ "partitioned by (p int, q int) clustered by (a) into 2 buckets " +
+ "stored as orc TBLPROPERTIES ('transactional'='true')"));
+ long txnid1 = txnMgr.openTxn("T1");
+ checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ //table is empty, so can only lock the table
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+ Assert.assertEquals(
+ "HIVE_LOCKS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " +
+ TxnDbUtil.queryToString("select * from HIVE_LOCKS"),
+ 1,
+ TxnDbUtil.countQueryAgent("select count(*) from HIVE_LOCKS where hl_txnid=" + txnid1));
+ txnMgr.rollbackTxn();
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 0,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid1));
+ //now actually write to table to generate some partitions
+ checkCmdOnDriver(driver.run("insert into target partition(p=1,q) values (1,2,2), (3,4,2), (5,6,3), (7,8,2)"));
+ driver.run("select count(*) from target");
+ List<String> r = new ArrayList<>();
+ driver.getResults(r);
+ Assert.assertEquals("", "4", r.get(0));
+ Assert.assertEquals(//look in COMPLETED_TXN_COMPONENTS because driver.run() committed!!!!
+ "COMPLETED_TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid1 + 1) + "): " +
+ TxnDbUtil.queryToString("select * from COMPLETED_TXN_COMPONENTS"),
+ 2,//2 distinct partitions created
+ //txnid+1 because we want txn used by previous driver.run("insert....)
+ TxnDbUtil.countQueryAgent("select count(*) from COMPLETED_TXN_COMPONENTS where ctc_txnid=" + (txnid1 + 1)));
+
+
+ long txnid2 = txnMgr.openTxn("T1");
+ checkCmdOnDriver(driver.compileAndRespond("insert into target partition(p=1,q) values (10,2,2), (30,4,2), (50,6,3), (70,8,2)"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+ locks = getLocks(txnMgr, true);
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ //Plan is using DummyPartition, so can only lock the table... unfortunately
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnid2, "default", "target", Arrays.asList("p=1/q=2","p=1/q=2"));
+ adp.setOperationType(DataOperationType.INSERT);
+ txnHandler.addDynamicPartitions(adp);
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 2,//2 distinct partitions modified
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2));
+ txnMgr.commitTxn();
+ }
+ @Test
+ public void testMergePartitioned01() throws Exception {
+ testMergePartitioned(false);
+ }
+ @Test
+ public void testMergePartitioned02() throws Exception {
+ testMergePartitioned(true);
+ }
+ /**
+ * "run" an Update and Merge concurrently; Check that correct locks are acquired.
+ * Check state of auxiliary ACID tables.
+ * @param causeConflict - true to make the operations cause a Write conflict
+ * @throws Exception
+ */
+ private void testMergePartitioned(boolean causeConflict) throws Exception {
+ dropTable(new String[] {"target","source"});
+ checkCmdOnDriver(driver.run("create table target (a int, b int) " +
+ "partitioned by (p int, q int) clustered by (a) into 2 buckets " +
+ "stored as orc TBLPROPERTIES ('transactional'='true')"));
+ checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)"));
+ checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int)"));
+
+ long txnId1 = txnMgr.openTxn("T1");
+ checkCmdOnDriver(driver.compileAndRespond("update target set b = 2 where p=1"));
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "T1");
+ List<ShowLocksResponseElement> locks = getLocks(txnMgr, true);
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ //start a 2nd (overlapping) txn
+ long txnid2 = txnMgr2.openTxn("T2");
+ checkCmdOnDriver(driver.compileAndRespond("merge into target using source " +
+ "on target.p=source.p1 and target.a=source.a1 " +
+ "when matched then update set b = 11 " +
+ "when not matched then insert values(a1,b1,p1,q1)"));
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "T2", false);
+ locks = getLocks(txnMgr, true);
+ Assert.assertEquals("Unexpected lock count", 7, locks.size());
+ /**
+ * W locks from T1 are still there, so all locks from T2 block.
+ * The Update part of Merge requests W locks for each existing partition in target.
+ * The Insert part doesn't know which partitions may be written to: thus R lock on target table.
+ * */
+ checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "source", null, locks);
+ long extLockId = checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "target", null, locks).getLockid();
+
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=2", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=1/q=3", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "target", "p=2/q=2", locks);
+
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 0,//because it's using a DP write
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1));
+ //complete T1 transaction (simulate writing to 2 partitions)
+ AddDynamicPartitions adp = new AddDynamicPartitions(txnId1, "default", "target",
+ Arrays.asList("p=1/q=2","p=1/q=3"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 2,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnId1 +
+ " and tc_operation_type='u'"));
+ txnMgr.commitTxn();//commit T1
+ Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnId1) + "): " +
+ TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 2,//2 partitions updated
+ TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnId1 +
+ " and ws_operation_type='u'"));
+
+
+ //re-check locks which were in Waiting state - should now be Acquired
+ ((DbLockManager)txnMgr2.getLockManager()).checkLock(extLockId);
+ locks = getLocks(txnMgr, true);
+ Assert.assertEquals("Unexpected lock count", 5, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "source", null, locks);
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "target", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=2", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=1/q=3", locks);
+ checkLock(LockType.SHARED_WRITE, LockState.ACQUIRED, "default", "target", "p=2/q=2", locks);
+
+
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 0,//because it's using a DP write
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2));
+ //complete T2 txn
+ //simulate Insert into 2 partitions
+ adp = new AddDynamicPartitions(txnid2, "default", "target",
+ Arrays.asList("p=1/q=2","p=1/q=3"));
+ adp.setOperationType(DataOperationType.INSERT);
+ txnHandler.addDynamicPartitions(adp);
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 2,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + " and tc_operation_type='i'"));
+ //simulate Update of 1 partitions; depending on causeConflict, choose one of the partitions
+ //which was modified by the T1 update stmt or choose a non-conflicting one
+ adp = new AddDynamicPartitions(txnid2, "default", "target",
+ Collections.singletonList(causeConflict ? "p=1/q=2" : "p=1/q=1"));
+ adp.setOperationType(DataOperationType.UPDATE);
+ txnHandler.addDynamicPartitions(adp);
+ Assert.assertEquals(
+ "TXN_COMPONENTS mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+ TxnDbUtil.queryToString("select * from TXN_COMPONENTS"),
+ 1,
+ TxnDbUtil.countQueryAgent("select count(*) from TXN_COMPONENTS where tc_txnid=" + txnid2 + " and tc_operation_type='u'"));
+
+
+ LockException expectedException = null;
+ try {
+ txnMgr2.commitTxn();
+ }
+ catch (LockException e) {
+ expectedException = e;
+ }
+ if(causeConflict) {
+ Assert.assertTrue("Didn't get exception", expectedException != null);
+ Assert.assertEquals("Got wrong message code", ErrorMsg.TXN_ABORTED, expectedException.getCanonicalErrorMsg());
+ Assert.assertEquals("Exception msg didn't match",
+ "Aborting [txnid:3,3] due to a write conflict on default/target/p=1/q=2 committed by [txnid:2,3] u/u",
+ expectedException.getCause().getMessage());
+ }
+ else {
+ Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+ TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1,//1 partitions updated
+ TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2 +
+ " and ws_operation_type='u'"));
+ Assert.assertEquals("WRITE_SET mismatch(" + JavaUtils.txnIdToString(txnid2) + "): " +
+ TxnDbUtil.queryToString("select * from WRITE_SET"),
+ 1,//1 partitions updated (and no other entries)
+ TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2));
+ }
+ }
+ //https://issues.apache.org/jira/browse/HIVE-15048
+ @Test
+ @Ignore("for some reason this fails with NPE in setUp() when run as part of the suite, but not standalone..")
+ public void testUpdateWithSubquery() throws Exception {
+ dropTable(new String[] {"target", "source"});
+ checkCmdOnDriver(driver.run("create table target (a int, b int) " +
+ "partitioned by (p int, q int) clustered by (a) into 2 buckets " +
+ "stored as orc TBLPROPERTIES ('transactional'='true')"));
+ checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
+
+ checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)"));
+
+ checkCmdOnDriver(driver.run(
+"update target set b = 1 where p in (select t.q1 from source t where t.a1=5)"));
+/**
+ * So the above query fails with invalid reference 'p' (in subquery) (as as if u use t.p)
+ * But before it fails, here is inputs/outpus before/after UpdateDeleteSemanticAnalyzer
+* Before UDSA
+* inputs: [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
+* outputs: [default@target]
+*
+* after UDSA
+* inputs: [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
+* outputs: [default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
+*
+* So it looks like....
+*/
+ checkCmdOnDriver(driver.run(
+ "update source set b1 = 1 where p1 in (select t.q from target t where t.p=2)"));
+
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java
index 3d2e648..467de26 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestIUD.java
@@ -32,7 +32,6 @@ import org.junit.Test;
*/
public class TestIUD {
private static HiveConf conf;
-
private ParseDriver pd;
@BeforeClass
@@ -47,6 +46,9 @@ public class TestIUD {
}
ASTNode parse(String query) throws ParseException {
+ return parse(query, pd, conf);
+ }
+ static ASTNode parse(String query, ParseDriver pd, HiveConf conf) throws ParseException {
ASTNode nd = null;
try {
nd = pd.parse(query, new Context(conf));
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java
new file mode 100644
index 0000000..7481e1a
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestMergeStatement.java
@@ -0,0 +1,246 @@
+package org.apache.hadoop.hive.ql.parse;
+
+import org.antlr.runtime.tree.RewriteEmptyStreamException;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+
+/**
+ * Testing parsing for SQL Merge statement
+ */
+public class TestMergeStatement {
+ private static HiveConf conf;
+ private ParseDriver pd;
+
+ @BeforeClass
+ public static void initialize() {
+ conf = new HiveConf(SemanticAnalyzer.class);
+ SessionState.start(conf);
+ }
+
+ @Before
+ public void setup() throws SemanticException, IOException {
+ pd = new ParseDriver();
+ }
+
+ ASTNode parse(String query) throws ParseException {
+ return TestIUD.parse(query, pd, conf);
+ }
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void test() throws ParseException {
+ ASTNode ast = parse(//using target.a breaks this
+ "MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN UPDATE set a = source.b, c=d+1");
+ Assert.assertEquals(
+ "(tok_merge " +
+ "(tok_tabref (tok_tabname target)) " +
+ "(tok_tabref (tok_tabname source)) " +
+ "(= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+ "(tok_matched " +
+ "(tok_update " +
+ "(tok_set_columns_clause " +
+ "(= (tok_table_or_col a) (. (tok_table_or_col source) b)) " +
+ "(= (tok_table_or_col c) (+ (tok_table_or_col d) 1))" +
+ ")" +
+ ")" +
+ ")" +
+ ")", ast.toStringTree());
+ }
+ @Test
+ public void test1() throws ParseException {
+ //testing MATCHED AND with CASE statement
+ ASTNode ast = parse(//using target.a breaks this
+ "MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED " +
+ "AND source.c2 < current_time() " +
+ "THEN UPDATE set a = source.b, b = case when c1 is null then c1 else c1 end");
+ Assert.assertEquals(
+ "(tok_merge " +
+ "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+ "(tok_matched " +
+ "(tok_update " +
+ "(tok_set_columns_clause " +
+ "(= (tok_table_or_col a) (. (tok_table_or_col source) b)) " +
+ "(= (tok_table_or_col b) (tok_function when (tok_function tok_isnull (tok_table_or_col c1)) (tok_table_or_col c1) (tok_table_or_col c1)))" +
+ ")" +
+ ") " +
+ "(< (. (tok_table_or_col source) c2) (tok_function current_time)))" +
+ ")", ast.toStringTree());
+ }
+ @Test
+ public void test2() throws ParseException {
+ ASTNode
+ ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN DELETE");
+ Assert.assertEquals(
+ "(tok_merge " +
+ "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+ "(tok_matched " +
+ "tok_delete)" +
+ ")", ast.toStringTree());
+ }
+ @Test
+ public void test3() throws ParseException {
+ ASTNode
+ ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED AND target.a + source.b > 8 THEN DELETE");
+ Assert.assertEquals(
+ "(tok_merge " +
+ "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+ "(tok_matched " +
+ "tok_delete " +
+ "(> (+ (. (tok_table_or_col target) a) (. (tok_table_or_col source) b)) 8))" +
+ ")", ast.toStringTree());
+ }
+ @Test
+ public void test4() throws ParseException {
+ ASTNode
+ ast = parse(
+ "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(source.a, case when source.b is null then target.b else source.b end)");
+ Assert.assertEquals(
+ "(tok_merge " +
+ "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+ "(tok_not_matched " +
+ "(tok_insert " +
+ "(tok_value_row " +
+ "(. (tok_table_or_col source) a) " +
+ "(tok_function when " +
+ "(tok_function tok_isnull (. (tok_table_or_col source) b)) (. (tok_table_or_col target) b) " +
+ "(. (tok_table_or_col source) b)" +
+ ")" +
+ ")" +
+ ")" +
+ ")" +
+ ")", ast.toStringTree());
+
+ }
+ /**
+ * both UPDATE and INSERT
+ * @throws ParseException
+ */
+ @Test
+ public void test5() throws ParseException {
+ ASTNode
+ ast = parse(
+ "MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN UPDATE set a = source.b, c=d+1 WHEN NOT MATCHED THEN INSERT VALUES(source.a, 2, current_date())");
+ Assert.assertEquals(
+ "(tok_merge " +
+ "(tok_tabref (tok_tabname target)) (tok_tabref (tok_tabname source)) (= (. (tok_table_or_col target) pk) (. (tok_table_or_col source) pk)) " +
+ "(tok_matched " +
+ "(tok_update " +
+ "(tok_set_columns_clause (= (tok_table_or_col a) (. (tok_table_or_col source) b)) (= (tok_table_or_col c) (+ (tok_table_or_col d) 1)))" +
+ ")" +
+ ") " +
+ "(tok_not_matched " +
+ "(tok_insert " +
+ "(tok_value_row " +
+ "(. (tok_table_or_col source) a) " +
+ "2 " +
+ "(tok_function current_date)" +
+ ")" +
+ ")" +
+ ")" +
+ ")", ast.toStringTree());
+
+ }
+ @Test
+ public void testNegative() throws ParseException {
+ expectedException.expect(ParseException.class);
+ expectedException.expectMessage("line 1:74 cannot recognize input near 'INSERT' '<EOF>' '<EOF>' in WHEN MATCHED THEN clause");
+ ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED THEN INSERT");
+ }
+ @Test
+ public void testNegative1() throws ParseException {
+ expectedException.expect(ParseException.class);
+ expectedException.expectMessage("line 1:78 mismatched input 'DELETE' expecting INSERT near 'THEN' in WHEN NOT MATCHED clause");
+ ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN DELETE");
+ }
+ @Test
+ public void test8() throws ParseException {
+ ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED AND a = 1 THEN UPDATE set a = b WHEN MATCHED THEN DELETE");
+ }
+ @Test
+ public void test9() throws ParseException {
+ ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk " +
+ "WHEN MATCHED AND a = 1 THEN UPDATE set a = b " +
+ "WHEN MATCHED THEN DELETE " +
+ "WHEN NOT MATCHED AND d < e THEN INSERT VALUES(1,2)");
+ }
+ @Test
+ public void test10() throws ParseException {
+ ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk " +
+ "WHEN MATCHED AND a = 1 THEN DELETE " +
+ "WHEN MATCHED THEN UPDATE set a = b " +
+ "WHEN NOT MATCHED AND d < e THEN INSERT VALUES(1,2)");
+ }
+ /**
+ * we always expect 0 or 1 update/delete WHEN clause and 0 or 1 insert WHEN clause (and 1 or 2 WHEN clauses altogether)
+ * @throws ParseException
+ */
+ @Test
+ public void testNegative3() throws ParseException {
+ expectedException.expect(ParseException.class);
+ expectedException.expectMessage("line 1:119 cannot recognize input near 'INSERT' 'VALUES' '(' in WHEN MATCHED THEN clause");
+ ASTNode ast = parse("MERGE INTO target USING source ON target.pk = source.pk WHEN MATCHED AND a = 1 THEN UPDATE set a = b WHEN MATCHED THEN INSERT VALUES(1,2)");
+ }
+ /**
+ * here we reverse the order of WHEN MATCHED/WHEN NOT MATCHED - should we allow it?
+ * @throws ParseException
+ */
+ @Test
+ public void testNegative4() throws ParseException {
+ expectedException.expect(ParseException.class);
+ expectedException.expectMessage("line 1:104 missing EOF at 'WHEN' near ')'");
+ ASTNode ast = parse(
+ "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(a,source.b) WHEN MATCHED THEN DELETE");
+ }
+
+ /**
+ * why does this fail but next one passes
+ * @throws ParseException
+ */
+ @Test
+ public void testNegative5() throws ParseException {
+ expectedException.expect(ParseException.class);
+ expectedException.expectMessage("line 1:103 mismatched input '+' expecting ) near 'b' in value row constructor");
+ //todo: why does this fail but next one passes?
+ ASTNode ast = parse(
+ "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(a,source.b + 1)");
+ }
+ @Test
+ public void test6() throws ParseException {
+ ASTNode ast = parse(
+ "MERGE INTO target USING source ON target.pk = source.pk WHEN NOT MATCHED THEN INSERT VALUES(a,(source.b + 1))");
+ }
+ @Test
+ public void testNegative6() throws ParseException {
+ expectedException.expect(RewriteEmptyStreamException.class);
+ expectedException.expectMessage("rule whenClauses");
+ ASTNode ast = parse(
+ "MERGE INTO target USING source ON target.pk = source.pk");
+ }
+ @Test
+ public void test7() throws ParseException {
+ ASTNode ast = parse("merge into acidTbl" +
+ " using nonAcidPart2 source ON acidTbl.a = source.a2 " +
+ "WHEN MATCHED THEN UPDATE set b = source.b2 " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)");
+ Assert.assertEquals(ast.toStringTree(),
+ "(tok_merge " +
+ "(tok_tabref (tok_tabname acidtbl)) (tok_tabref (tok_tabname nonacidpart2) source) " +
+ "(= (. (tok_table_or_col acidtbl) a) (. (tok_table_or_col source) a2)) " +
+ "(tok_matched " +
+ "(tok_update " +
+ "(tok_set_columns_clause (= (tok_table_or_col b) (. (tok_table_or_col source) b2))))) " +
+ "(tok_not_matched " +
+ "(tok_insert " +
+ "(tok_value_row (. (tok_table_or_col source) a2) (. (tok_table_or_col source) b2)))))");
+ }
+}
[3/3] hive git commit: HIVE-14943 Base Implementation (of HIVE-10924)
(Eugene Koifman, reviewed by Alan Gates)
Posted by ek...@apache.org.
HIVE-14943 Base Implementation (of HIVE-10924) (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e00f909d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e00f909d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e00f909d
Branch: refs/heads/master
Commit: e00f909dd35ee46de5d9de493bb76e37ba4b6f74
Parents: 52ba014
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Sat Nov 12 12:20:01 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Sat Nov 12 12:20:01 2016 -0800
----------------------------------------------------------------------
.../hive/metastore/LockComponentBuilder.java | 4 +
.../metastore/txn/CompactionTxnHandler.java | 5 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 51 +-
.../java/org/apache/hadoop/hive/ql/Context.java | 63 +-
.../java/org/apache/hadoop/hive/ql/Driver.java | 4 +-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 11 +-
.../apache/hadoop/hive/ql/exec/MoveTask.java | 4 +
.../apache/hadoop/hive/ql/hooks/ReadEntity.java | 3 +
.../hadoop/hive/ql/hooks/WriteEntity.java | 10 +
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 9 +-
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 2 +-
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 7 +-
.../BucketingSortingReduceSinkOptimizer.java | 7 +-
.../apache/hadoop/hive/ql/parse/ASTNode.java | 8 +
.../hadoop/hive/ql/parse/FromClauseParser.g | 6 +-
.../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 2 +
.../apache/hadoop/hive/ql/parse/HiveParser.g | 60 +-
.../hadoop/hive/ql/parse/IdentifiersParser.g | 7 +-
.../hadoop/hive/ql/parse/QBParseInfo.java | 7 +
.../hadoop/hive/ql/parse/SelectClauseParser.g | 2 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 155 +--
.../hive/ql/parse/SemanticAnalyzerFactory.java | 1 +
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 989 ++++++++++++++++---
.../hadoop/hive/ql/plan/CreateTableDesc.java | 4 +-
.../metastore/txn/TestCompactionTxnHandler.java | 1 +
.../apache/hadoop/hive/ql/TestTxnCommands.java | 35 +
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 417 +++++++-
.../ql/TestTxnCommands2WithSplitUpdate.java | 13 +-
.../hive/ql/lockmgr/TestDbTxnManager2.java | 645 +++++++++++-
.../apache/hadoop/hive/ql/parse/TestIUD.java | 4 +-
.../hive/ql/parse/TestMergeStatement.java | 246 +++++
31 files changed, 2464 insertions(+), 318 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java b/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
index 3e8f193..e074152 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
@@ -102,6 +102,10 @@ public class LockComponentBuilder {
partNameSet = true;
return this;
}
+ public LockComponentBuilder setIsDynamicPartitionWrite(boolean t) {
+ component.setIsDynamicPartitionWrite(t);
+ return this;
+ }
/**
* Get the constructed lock component.
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 75a4d87..9145fcc 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -43,9 +43,6 @@ class CompactionTxnHandler extends TxnHandler {
static final private String CLASS_NAME = CompactionTxnHandler.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
- // Always access COMPACTION_QUEUE before COMPLETED_TXN_COMPONENTS
- // See TxnHandler for notes on how to deal with deadlocks. Follow those notes.
-
public CompactionTxnHandler() {
}
@@ -428,7 +425,7 @@ class CompactionTxnHandler extends TxnHandler {
}
/**
- * Clean up aborted transactions from txns that have no components in txn_components. The reson such
+ * Clean up aborted transactions from txns that have no components in txn_components. The reason such
* txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
* abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 547ee98..a815f2c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -625,7 +625,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix));
if (rs.next()) {
close(rs);
- //here means currently committing txn performed update/delete and we should check WW conflict
+ //if here it means currently committing txn performed update/delete and we should check WW conflict
/**
* This S4U will mutex with other commitTxn() and openTxns().
* -1 below makes txn intervals look like [3,3] [4,4] if all txns are serial
@@ -653,7 +653,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
*/
rs = stmt.executeQuery
(sqlGenerator.addLimitClause(1, "committed.ws_txnid, committed.ws_commit_id, committed.ws_database," +
- "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id " +
+ "committed.ws_table, committed.ws_partition, cur.ws_commit_id cur_ws_commit_id, " +
+ "cur.ws_operation_type cur_op, committed.ws_operation_type committed_op " +
"from WRITE_SET committed INNER JOIN WRITE_SET cur " +
"ON committed.ws_database=cur.ws_database and committed.ws_table=cur.ws_table " +
//For partitioned table we always track writes at partition level (never at table)
@@ -677,7 +678,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
resource.append('/').append(partitionName);
}
String msg = "Aborting [" + JavaUtils.txnIdToString(txnid) + "," + rs.getLong(6) + "]" + " due to a write conflict on " + resource +
- " committed by " + committedTxn;
+ " committed by " + committedTxn + " " + rs.getString(7) + "/" + rs.getString(8);
close(rs);
//remove WRITE_SET info for current txn since it's about to abort
dbConn.rollback(undoWriteSetForCurrentTxn);
@@ -712,6 +713,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
int modCount = 0;
if ((modCount = stmt.executeUpdate(s)) < 1) {
//this can be reasonable for an empty txn START/COMMIT or read-only txn
+ //also an IUD with DP that didn't match any rows.
LOG.info("Expected to move at least one record from txn_components to " +
"completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
}
@@ -884,14 +886,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (txnid > 0) {
List<String> rows = new ArrayList<>();
- /**
- * todo QueryPlan has BaseSemanticAnalyzer which has acidFileSinks list of FileSinkDesc
- * FileSinkDesc.table is ql.metadata.Table
- * Table.tableSpec which is TableSpec, which has specType which is SpecType
- * So maybe this can work to know that this is part of dynamic partition insert in which case
- * we'll get addDynamicPartitions() call and should not write TXN_COMPONENTS here.
- * In any case, that's an optimization for now; will be required when adding multi-stmt txns
- */
// For each component in this lock request,
// add an entry to the txn_components table
for (LockComponent lc : rqst.getComponent()) {
@@ -909,7 +903,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
case INSERT:
case UPDATE:
case DELETE:
- updateTxnComponents = true;
+ if(!lc.isSetIsDynamicPartitionWrite()) {
+ //must be old client talking, i.e. we don't know if it's DP so be conservative
+ updateTxnComponents = true;
+ }
+ else {
+ /**
+ * we know this is part of DP operation and so we'll get
+ * {@link #addDynamicPartitions(AddDynamicPartitions)} call with the list
+ * of partitions actually chaged.
+ */
+ updateTxnComponents = !lc.isIsDynamicPartitionWrite();
+ }
break;
case SELECT:
updateTxnComponents = false;
@@ -1544,22 +1549,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if(rqst.isSetOperationType()) {
ot = OpertaionType.fromDataOperationType(rqst.getOperationType());
}
-
- //what if a txn writes the same table > 1 time...(HIVE-9675) let's go with this for now, but really
- //need to not write this in the first place, i.e. make this delete not needed
- //see enqueueLockWithRetry() - that's where we write to TXN_COMPONENTS
- String deleteSql = "delete from TXN_COMPONENTS where tc_txnid=" + rqst.getTxnid() + " and tc_database=" +
- quoteString(rqst.getDbname()) + " and tc_table=" + quoteString(rqst.getTablename());
- //we delete the entries made by enqueueLockWithRetry() since those are based on lock information which is
- //much "wider" than necessary in a lot of cases. Here on the other hand, we know exactly which
- //partitions have been written to. w/o this WRITE_SET would contain entries for partitions not actually
- //written to
- int modCount = stmt.executeUpdate(deleteSql);
List<String> rows = new ArrayList<>();
for (String partName : rqst.getPartitionnames()) {
rows.add(rqst.getTxnid() + "," + quoteString(rqst.getDbname()) + "," + quoteString(rqst.getTablename()) +
"," + quoteString(partName) + "," + quoteChar(ot.sqlConst));
}
+ int modCount = 0;
+ //record partitions that were written to
List<String> queries = sqlGenerator.createInsertValuesStmt(
"TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type)", rows);
for(String query : queries) {
@@ -2080,7 +2076,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
public int compare(LockInfo info1, LockInfo info2) {
- // We sort by state (acquired vs waiting) and then by LockType, they by id
+ // We sort by state (acquired vs waiting) and then by LockType, then by id
if (info1.state == LockState.ACQUIRED &&
info2.state != LockState .ACQUIRED) {
return -1;
@@ -2285,6 +2281,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
/**
* todo: Longer term we should pass this from client somehow - this would be an optimization; once
* that is in place make sure to build and test "writeSet" below using OperationType not LockType
+ * With SP we assume that the query modifies exactly the partitions it locked. (not entirely
+ * realistic since Update/Delete may have some predicate that filters out all records out of
+ * some partition(s), but plausible). For DP, we acquire locks very wide (all known partitions),
+ * but for most queries only a fraction will actually be updated. #addDynamicPartitions() tells
+ * us exactly which ones were written to. Thus using this trick to kill a query early for
+ * DP queries may be too restrictive.
*/
boolean isPartOfDynamicPartitionInsert = true;
try {
@@ -2567,6 +2569,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
(desiredLock.txnId != 0 && desiredLock.txnId == existingLock.txnId) ||
//txnId=0 means it's a select or IUD which does not write to ACID table, e.g
//insert overwrite table T partition(p=1) select a,b from T and autoCommit=true
+ // todo: fix comment as of HIVE-14988
(desiredLock.txnId == 0 && desiredLock.extLockId == existingLock.extLockId);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 838d73e..4355d21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -25,6 +25,7 @@ import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -44,13 +45,13 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.DbTxnManager.Heartbeater;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration;
import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -107,11 +108,6 @@ public class Context {
// Transaction manager for this query
protected HiveTxnManager hiveTxnManager;
- // Used to track what type of acid operation (insert, update, or delete) we are doing. Useful
- // since we want to change where bucket columns are accessed in some operators and
- // optimizations when doing updates and deletes.
- private AcidUtils.Operation acidOperation = AcidUtils.Operation.NOT_ACID;
-
private boolean needLockMgr;
private AtomicInteger sequencer = new AtomicInteger();
@@ -129,6 +125,53 @@ public class Context {
private Heartbeater heartbeater;
private boolean skipTableMasking;
+ /**
+ * This determines the prefix of the
+ * {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.Phase1Ctx#dest}
+ * name for a given subtree of the AST. Most of the times there is only 1 destination in a
+ * given tree but multi-insert has several and multi-insert representing MERGE must use
+ * different prefixes to encode the purpose of different Insert branches
+ */
+ private Map<ASTNode, DestClausePrefix> tree2DestNamePrefix;
+ public enum DestClausePrefix {
+ INSERT("insclause-"), UPDATE("updclause-"), DELETE("delclause-");
+ private final String prefix;
+ DestClausePrefix(String prefix) {
+ this.prefix = prefix;
+ }
+ public String toString() {
+ return prefix;
+ }
+ }
+ /**
+ * The suffix is always relative to a given ASTNode
+ */
+ public DestClausePrefix getDestNamePrefix(ASTNode curNode) {
+ //if there is no mapping, we want to default to "old" naming
+ assert curNode != null : "must supply curNode";
+ if(tree2DestNamePrefix == null || tree2DestNamePrefix.isEmpty()) {
+ return DestClausePrefix.INSERT;
+ }
+ do {
+ DestClausePrefix prefix = tree2DestNamePrefix.get(curNode);
+ if(prefix != null) {
+ return prefix;
+ }
+ curNode = (ASTNode) curNode.parent;
+ } while(curNode != null);
+ return DestClausePrefix.INSERT;
+ }
+ /**
+ * Will make SemanticAnalyzer.Phase1Ctx#dest in subtree rooted at 'tree' use 'prefix'
+ * @param tree
+ * @return previous prefix for 'tree' or null
+ */
+ public DestClausePrefix addDestNamePrefix(ASTNode tree, DestClausePrefix prefix) {
+ if(tree2DestNamePrefix == null) {
+ tree2DestNamePrefix = new IdentityHashMap<>();
+ }
+ return tree2DestNamePrefix.put(tree, prefix);
+ }
public Context(Configuration conf) throws IOException {
this(conf, generateExecutionId());
@@ -760,14 +803,6 @@ public class Context {
this.tryCount = tryCount;
}
- public void setAcidOperation(AcidUtils.Operation op) {
- acidOperation = op;
- }
-
- public AcidUtils.Operation getAcidOperation() {
- return acidOperation;
- }
-
public String getCboInfo() {
return cboInfo;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 923ef08..b77948b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1111,7 +1111,9 @@ public class Driver implements CommandProcessor {
if (haveAcidWrite()) {
for (FileSinkDesc desc : acidSinks) {
desc.setTransactionId(txnMgr.getCurrentTxnId());
- desc.setStatementId(txnMgr.getStatementId());
+ //it's possible to have > 1 FileSink writing to the same table/partition
+ //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes
+ desc.setStatementId(txnMgr.getWriteIdAndIncrement());
}
}
/*Note, we have to record snapshot after lock acquisition to prevent lost update problem
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 7ed3907..97fcd55 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -398,7 +398,7 @@ public enum ErrorMsg {
DISTINCT_NOT_SUPPORTED(10285, "Distinct keyword is not support in current context"),
NONACID_COMPACTION_NOT_SUPPORTED(10286, "Compaction is not allowed on non-ACID table {0}.{1}", true),
- UPDATEDELETE_PARSE_ERROR(10290, "Encountered parse error while parsing rewritten update or " +
+ UPDATEDELETE_PARSE_ERROR(10290, "Encountered parse error while parsing rewritten merge/update or " +
"delete query"),
UPDATEDELETE_IO_ERROR(10291, "Encountered I/O error while parsing rewritten update or " +
"delete query"),
@@ -456,6 +456,10 @@ public enum ErrorMsg {
REPLACE_MATERIALIZED_WITH_VIEW(10401, "Attempt to replace materialized view {0} with view", true),
UPDATE_DELETE_VIEW(10402, "You cannot update or delete records in a view"),
MATERIALIZED_VIEW_DEF_EMPTY(10403, "Query for the materialized view rebuild could not be retrieved"),
+ MERGE_PREDIACTE_REQUIRED(10404, "MERGE statement with both UPDATE and DELETE clauses " +
+ "requires \"AND <boolean>\" on the 1st WHEN MATCHED clause of <{0}>", true),
+ MERGE_TOO_MANY_DELETE(10405, "MERGE statment can have at most 1 WHEN MATCHED ... DELETE clause: <{0}>", true),
+ MERGE_TOO_MANY_UPDATE(10406, "MERGE statment can have at most 1 WHEN MATCHED ... UPDATE clause: <{0}>", true),
//========================== 20000 range starts here ========================//
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
@@ -722,6 +726,11 @@ public enum ErrorMsg {
sb.append(":");
sb.append(getCharPositionInLine(tree));
}
+ public static String renderPosition(ASTNode n) {
+ StringBuilder sb = new StringBuilder();
+ ErrorMsg.renderPosition(sb, n);
+ return sb.toString();
+ }
public String getMsg(Tree tree) {
return getMsg((ASTNode) tree);
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 8265af4..349f115 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -206,6 +206,10 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
}
Context ctx = driverContext.getCtx();
+ if(ctx.getHiveTxnManager().supportsAcid()) {
+ //Acid LM doesn't maintain getOutputLockObjects(); this 'if' just makes it more explicit
+ return;
+ }
HiveLockManager lockMgr = ctx.getHiveTxnManager().getLockManager();
WriteEntity output = ctx.getLoadTableOutputMap().get(ltd);
List<HiveLockObj> lockObjects = ctx.getOutputLockObjects().get(output);
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
index 3d7de69..b805904 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
@@ -53,6 +53,9 @@ public class ReadEntity extends Entity implements Serializable {
// important because in that case we shouldn't acquire a lock for it or authorize the read.
// These will be handled by the output to the table instead.
private boolean isUpdateOrDelete = false;
+ //https://issues.apache.org/jira/browse/HIVE-15048
+ public transient boolean isFromTopLevelQuery = true;
+
// For views, the entities can be nested - by default, entities are at the top level
// Must be deterministic order set for consistent q-test output across Java versions
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
index 9e18638..da8c1e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
@@ -38,6 +38,7 @@ public class WriteEntity extends Entity implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(WriteEntity.class);
private boolean isTempURI = false;
+ private transient boolean isDynamicPartitionWrite = false;
public static enum WriteType {
DDL_EXCLUSIVE, // for use in DDL statements that require an exclusive lock,
@@ -221,5 +222,14 @@ public class WriteEntity extends Entity implements Serializable {
throw new RuntimeException("Unknown operation " + op.toString());
}
}
+ public boolean isDynamicPartitionWrite() {
+ return isDynamicPartitionWrite;
+ }
+ public void setDynamicPartitionWrite(boolean t) {
+ isDynamicPartitionWrite = t;
+ }
+ public String toDetailedString() {
+ return toString() + " Type=" + getTyp() + " WriteType=" + getWriteType() + " isDP=" + isDynamicPartitionWrite();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index da7505b..867e445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -321,6 +321,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
if(t != null && AcidUtils.isAcidTable(t)) {
compBuilder.setIsAcid(true);
}
+ compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
rqstBuilder.addLockComponent(comp);
@@ -335,9 +336,6 @@ public class DbTxnManager extends HiveTxnManagerImpl {
}
List<HiveLock> locks = new ArrayList<HiveLock>(1);
- if(isTxnOpen()) {
- statementId++;
- }
LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, isBlocking, locks);
ctx.setHiveLocks(locks);
return lockState;
@@ -650,8 +648,9 @@ public class DbTxnManager extends HiveTxnManagerImpl {
return txnId;
}
@Override
- public int getStatementId() {
- return statementId;
+ public int getWriteIdAndIncrement() {
+ assert isTxnOpen();
+ return statementId++;
}
private static long getHeartbeatInterval(Configuration conf) throws LockException {
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 1d071a8..f001f59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -63,7 +63,7 @@ class DummyTxnManager extends HiveTxnManagerImpl {
}
@Override
- public int getStatementId() {
+ public int getWriteIdAndIncrement() {
return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 9b4a97f..5b9ad60 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -210,8 +210,9 @@ public interface HiveTxnManager {
long getCurrentTxnId();
/**
- * 0..N Id of current statement within currently opened transaction
+ * Should be though of more as a unique write operation ID in a given txn (at QueryPlan level).
+ * Each statement writing data within a multi statement txn should have a unique WriteId.
+ * Even a single statement, (e.g. Merge, multi-insert may generates several writes).
*/
- int getStatementId();
-
+ int getWriteIdAndIncrement();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
index da261bb..8f40998 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
@@ -401,9 +401,12 @@ public class BucketingSortingReduceSinkOptimizer extends Transform {
return null;
}
+ assert fsOp.getConf().getWriteType() == rsOp.getConf().getWriteType() :
+ "WriteType mismatch. fsOp is " + fsOp.getConf().getWriteType() +
+ "; rsOp is " + rsOp.getConf().getWriteType();
// Don't do this optimization with updates or deletes
- if (pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.UPDATE ||
- pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.DELETE){
+ if (fsOp.getConf().getWriteType() == AcidUtils.Operation.UPDATE ||
+ fsOp.getConf().getWriteType() == AcidUtils.Operation.DELETE) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
index 62f9d14..0e6d903 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ASTNode.java
@@ -42,6 +42,7 @@ public class ASTNode extends CommonTree implements Node,Serializable {
private transient ASTNode rootNode;
private transient boolean isValidASTStr;
private transient boolean visited = false;
+ transient String matchedText;
public ASTNode() {
}
@@ -347,4 +348,11 @@ public class ASTNode extends CommonTree implements Node,Serializable {
return rootNode.getMemoizedSubString(startIndx, endIndx);
}
+ /**
+ * The string that generated this node.
+ * Only set for a node if parser grammar sets it for a particular rule
+ */
+ public String getMatchedText() {
+ return matchedText;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
index 26aca96..f8adb38 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/FromClauseParser.g
@@ -19,7 +19,7 @@ parser grammar FromClauseParser;
options
{
output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
backtrack=false;
k=3;
}
@@ -142,7 +142,7 @@ tableAlias
fromSource
@init { gParent.pushMsg("from source", state); }
-@after { gParent.popMsg(state); }
+@after { $fromSource.tree.matchedText = $fromSource.text; gParent.popMsg(state); }
:
(LPAREN KW_VALUES) => fromSource0
| (LPAREN) => LPAREN joinSource RPAREN -> joinSource
@@ -278,7 +278,7 @@ searchCondition
// INSERT INTO <table> (col1,col2,...) SELECT * FROM (VALUES(1,2,3),(4,5,6),...) as Foo(a,b,c)
valueRowConstructor
@init { gParent.pushMsg("value row constructor", state); }
-@after { gParent.popMsg(state); }
+@after { $valueRowConstructor.tree.matchedText = $valueRowConstructor.text; gParent.popMsg(state); }
:
LPAREN precedenceUnaryPrefixExpression (COMMA precedenceUnaryPrefixExpression)* RPAREN -> ^(TOK_VALUE_ROW precedenceUnaryPrefixExpression+)
;
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
index 63c32a8..b467c51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
@@ -336,6 +336,8 @@ KW_KEY: 'KEY';
KW_ABORT: 'ABORT';
KW_EXTRACT: 'EXTRACT';
KW_FLOOR: 'FLOOR';
+KW_MERGE: 'MERGE';
+KW_MATCHED: 'MATCHED';
// Operators
// NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work.
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 8aa39b0..bd53a36 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -20,7 +20,7 @@ options
{
tokenVocab=HiveLexer;
output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
backtrack=false;
k=3;
}
@@ -384,6 +384,11 @@ TOK_ROLLBACK;
TOK_SET_AUTOCOMMIT;
TOK_CACHE_METADATA;
TOK_ABORT_TRANSACTIONS;
+TOK_MERGE;
+TOK_MATCHED;
+TOK_NOT_MATCHED;
+TOK_UPDATE;
+TOK_DELETE;
}
@@ -737,6 +742,7 @@ execStatement
| deleteStatement
| updateStatement
| sqlTransactionStatement
+ | mergeStatement
;
loadStatement
@@ -2670,3 +2676,55 @@ abortTransactionStatement
:
KW_ABORT KW_TRANSACTIONS ( Number )+ -> ^(TOK_ABORT_TRANSACTIONS ( Number )+)
;
+
+
+/*
+BEGIN SQL Merge statement
+*/
+mergeStatement
+@init { pushMsg("MERGE statement", state); }
+@after { popMsg(state); }
+ :
+ KW_MERGE KW_INTO tableName (KW_AS? identifier)? KW_USING fromSource KW_ON expression whenClauses ->
+ ^(TOK_MERGE ^(TOK_TABREF tableName identifier?) fromSource expression whenClauses)
+ ;
+/*
+Allow 0,1 or 2 WHEN MATCHED clauses and 0 or 1 WHEN NOT MATCHED
+Each WHEN clause may have AND <boolean predicate>.
+If 2 WHEN MATCHED clauses are present, 1 must be UPDATE the other DELETE and the 1st one
+must have AND <boolean predicate>
+*/
+whenClauses
+ :
+ (whenMatchedAndClause|whenMatchedThenClause)* whenNotMatchedClause?
+ ;
+whenNotMatchedClause
+@init { pushMsg("WHEN NOT MATCHED clause", state); }
+@after { popMsg(state); }
+ :
+ KW_WHEN KW_NOT KW_MATCHED (KW_AND expression)? KW_THEN KW_INSERT KW_VALUES valueRowConstructor ->
+ ^(TOK_NOT_MATCHED ^(TOK_INSERT valueRowConstructor) expression?)
+ ;
+whenMatchedAndClause
+@init { pushMsg("WHEN MATCHED AND clause", state); }
+@after { popMsg(state); }
+ :
+ KW_WHEN KW_MATCHED KW_AND expression KW_THEN updateOrDelete ->
+ ^(TOK_MATCHED updateOrDelete expression)
+ ;
+whenMatchedThenClause
+@init { pushMsg("WHEN MATCHED THEN clause", state); }
+@after { popMsg(state); }
+ :
+ KW_WHEN KW_MATCHED KW_THEN updateOrDelete ->
+ ^(TOK_MATCHED updateOrDelete)
+ ;
+updateOrDelete
+ :
+ KW_UPDATE setColumnsClause -> ^(TOK_UPDATE setColumnsClause)
+ |
+ KW_DELETE -> TOK_DELETE
+ ;
+/*
+END SQL Merge statement
+*/
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index 2e40aa5..aa92739 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -19,7 +19,7 @@ parser grammar IdentifiersParser;
options
{
output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
backtrack=false;
k=3;
}
@@ -383,7 +383,7 @@ intervalQualifiers
expression
@init { gParent.pushMsg("expression specification", state); }
-@after { gParent.popMsg(state); }
+@after { $expression.tree.matchedText = $expression.text; gParent.popMsg(state); }
:
precedenceOrExpression
;
@@ -459,6 +459,7 @@ precedencePlusOperator
;
precedencePlusExpression
+@after { $precedencePlusExpression.tree.matchedText = $precedencePlusExpression.text; }
:
precedenceStarExpression (precedencePlusOperator^ precedenceStarExpression)*
;
@@ -759,6 +760,8 @@ nonReserved
| KW_VALIDATE
| KW_NOVALIDATE
| KW_KEY
+ | KW_MERGE
+ | KW_MATCHED
;
//The following SQL2011 reserved keywords are used as function name only, but not as identifiers.
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
index 3a0402e..f549dff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
@@ -181,6 +181,9 @@ public class QBParseInfo {
insertIntoTables.put(fullName.toLowerCase(), ast);
}
+ /**
+ * See also {@link #getInsertOverwriteTables()}
+ */
public boolean isInsertIntoTable(String dbName, String table) {
String fullName = dbName + "." + table;
return insertIntoTables.containsKey(fullName.toLowerCase());
@@ -188,6 +191,7 @@ public class QBParseInfo {
/**
* Check if a table is in the list to be inserted into
+ * See also {@link #getInsertOverwriteTables()}
* @param fullTableName table name in dbname.tablename format
* @return
*/
@@ -640,6 +644,9 @@ public class QBParseInfo {
this.isPartialScanAnalyzeCommand = isPartialScanAnalyzeCommand;
}
+ /**
+ * See also {@link #isInsertIntoTable(String)}
+ */
public Map<String, ASTNode> getInsertOverwriteTables() {
return insertOverwriteTables;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
index 3c6fa39..2c2e856 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SelectClauseParser.g
@@ -19,7 +19,7 @@ parser grammar SelectClauseParser;
options
{
output=AST;
-ASTLabelType=CommonTree;
+ASTLabelType=ASTNode;
backtrack=false;
k=3;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 17dfd03..8f5542b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hive.conf.HiveConf.StrictChecks;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryProperties;
import org.apache.hadoop.hive.ql.QueryState;
@@ -679,17 +681,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
this.ast = newAST;
}
- /**
- * Goes though the tabref tree and finds the alias for the table. Once found,
- * it records the table name-> alias association in aliasToTabs. It also makes
- * an association from the alias to the table AST in parse info.
- *
- * @return the alias of the table
- */
- private String processTable(QB qb, ASTNode tabref) throws SemanticException {
- // For each table reference get the table name
- // and the alias (if alias is not present, the table name
- // is used as an alias)
+ int[] findTabRefIdxs(ASTNode tabref) {
+ assert tabref.getType() == HiveParser.TOK_TABREF;
int aliasIndex = 0;
int propsIndex = -1;
int tsampleIndex = -1;
@@ -706,11 +699,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
aliasIndex = index;
}
}
-
+ return new int[] {aliasIndex, propsIndex, tsampleIndex, ssampleIndex};
+ }
+ String findSimpleTableName(ASTNode tabref, int aliasIndex) {
+ assert tabref.getType() == HiveParser.TOK_TABREF;
ASTNode tableTree = (ASTNode) (tabref.getChild(0));
- String tabIdName = getUnescapedName(tableTree).toLowerCase();
-
String alias;
if (aliasIndex != 0) {
alias = unescapeIdentifier(tabref.getChild(aliasIndex).getText());
@@ -718,6 +712,30 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
else {
alias = getUnescapedUnqualifiedTableName(tableTree);
}
+ return alias;
+ }
+ /**
+ * Goes though the tabref tree and finds the alias for the table. Once found,
+ * it records the table name-> alias association in aliasToTabs. It also makes
+ * an association from the alias to the table AST in parse info.
+ *
+ * @return the alias of the table
+ */
+ private String processTable(QB qb, ASTNode tabref) throws SemanticException {
+ // For each table reference get the table name
+ // and the alias (if alias is not present, the table name
+ // is used as an alias)
+ int[] indexes = findTabRefIdxs(tabref);
+ int aliasIndex = indexes[0];
+ int propsIndex = indexes[1];
+ int tsampleIndex = indexes[2];
+ int ssampleIndex = indexes[3];
+
+ ASTNode tableTree = (ASTNode) (tabref.getChild(0));
+
+ String tabIdName = getUnescapedName(tableTree).toLowerCase();
+
+ String alias = findSimpleTableName(tabref, aliasIndex);
if (propsIndex >= 0) {
Tree propsAST = tabref.getChild(propsIndex);
@@ -1423,7 +1441,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
doPhase1GetColumnAliasesFromSelect(ast, qbp);
qbp.setAggregationExprsForClause(ctx_1.dest, aggregations);
qbp.setDistinctFuncExprsForClause(ctx_1.dest,
- doPhase1GetDistinctFuncExprs(aggregations));
+ doPhase1GetDistinctFuncExprs(aggregations));
break;
case HiveParser.TOK_WHERE:
@@ -1438,7 +1456,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
qbp.addInsertIntoTable(tab_name, ast);
case HiveParser.TOK_DESTINATION:
- ctx_1.dest = "insclause-" + ctx_1.nextNum;
+ ctx_1.dest = this.ctx.getDestNamePrefix(ast).toString() + ctx_1.nextNum;
ctx_1.nextNum++;
boolean isTmpFileDest = false;
if (ast.getChildCount() > 0 && ast.getChild(0) instanceof ASTNode) {
@@ -1945,25 +1963,6 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(alias));
}
}
-
- // Disallow INSERT INTO on bucketized tables
- boolean isAcid = AcidUtils.isAcidTable(tab);
- boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(tab.getDbName(), tab.getTableName());
- if (isTableWrittenTo &&
- tab.getNumBuckets() > 0 && !isAcid) {
- throw new SemanticException(ErrorMsg.INSERT_INTO_BUCKETIZED_TABLE.
- getMsg("Table: " + tabName));
- }
- // Disallow update and delete on non-acid tables
- if ((updating() || deleting()) && !isAcid && isTableWrittenTo) {
- //isTableWrittenTo: delete from acidTbl where a in (select id from nonAcidTable)
- //so only assert this if we are actually writing to this table
- // Whether we are using an acid compliant transaction manager has already been caught in
- // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid
- // here, it means the table itself doesn't support it.
- throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, tabName);
- }
-
if (tab.isView()) {
if (qb.getParseInfo().isAnalyzeCommand()) {
throw new SemanticException(ErrorMsg.ANALYZE_VIEW.getMsg());
@@ -2086,6 +2085,21 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
.getMsg(ast, "The class is " + outputFormatClass.toString()));
}
+ boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(ts.tableHandle.getDbName(),
+ ts.tableHandle.getTableName());
+ isTableWrittenTo |= (qb.getParseInfo().getInsertOverwriteTables().
+ get(getUnescapedName((ASTNode) ast.getChild(0), ts.tableHandle.getDbName())) != null);
+ assert isTableWrittenTo :
+ "Inconsistent data structure detected: we are writing to " + ts.tableHandle + " in " +
+ name + " but it's not in isInsertIntoTable() or getInsertOverwriteTables()";
+ // Disallow update and delete on non-acid tables
+ boolean isAcid = AcidUtils.isAcidTable(ts.tableHandle);
+ if ((updating(name) || deleting(name)) && !isAcid) {
+ // Whether we are using an acid compliant transaction manager has already been caught in
+ // UpdateDeleteSemanticAnalyzer, so if we are updating or deleting and getting nonAcid
+ // here, it means the table itself doesn't support it.
+ throw new SemanticException(ErrorMsg.ACID_OP_ON_NONACID_TABLE, ts.tableName);
+ }
// TableSpec ts is got from the query (user specified),
// which means the user didn't specify partitions in their query,
// but whether the table itself is partitioned is not know.
@@ -6421,7 +6435,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if (dest_tab.getNumBuckets() > 0) {
enforceBucketing = true;
- if (updating() || deleting()) {
+ if (updating(dest) || deleting(dest)) {
partnCols = getPartitionColsFromBucketColsForUpdateDelete(input, true);
} else {
partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
@@ -6472,7 +6486,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
nullOrder.append(sortOrder == BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC ? 'a' : 'z');
}
input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), nullOrder.toString(),
- maxReducers, (AcidUtils.isAcidTable(dest_tab) ? getAcidType() : AcidUtils.Operation.NOT_ACID));
+ maxReducers, (AcidUtils.isAcidTable(dest_tab) ? getAcidType(dest) : AcidUtils.Operation.NOT_ACID));
reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0));
ctx.setMultiFileSpray(multiFileSpray);
ctx.setNumFiles(numFiles);
@@ -6488,7 +6502,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if ((dest_tab.getNumBuckets() > 0)) {
enforceBucketing = true;
- if (updating() || deleting()) {
+ if (updating(dest) || deleting(dest)) {
partnColsNoConvert = getPartitionColsFromBucketColsForUpdateDelete(input, false);
} else {
partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
@@ -6646,7 +6660,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if (!isNonNativeTable) {
AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
if (destTableIsAcid) {
- acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+ acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
checkAcidConstraints(qb, table_desc, dest_tab);
}
ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp);
@@ -6666,7 +6680,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// in the case of DP, we will register WriteEntity in MoveTask when the
// list of dynamically created partitions are known.
if ((dpCtx == null || dpCtx.getNumDPCols() == 0)) {
- output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable));
+ output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest));
if (!outputs.add(output)) {
throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
.getMsg(dest_tab.getTableName()));
@@ -6675,8 +6689,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if ((dpCtx != null) && (dpCtx.getNumDPCols() >= 0)) {
// No static partition specified
if (dpCtx.getNumSPCols() == 0) {
- output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable), false);
+ output = new WriteEntity(dest_tab, determineWriteType(ltd, isNonNativeTable, dest), false);
outputs.add(output);
+ output.setDynamicPartitionWrite(true);
}
// part of the partition specified
// Create a DummyPartition in this case. Since, the metastore does not store partial
@@ -6689,7 +6704,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
new DummyPartition(dest_tab, dest_tab.getDbName()
+ "@" + dest_tab.getTableName() + "@" + ppath,
partSpec);
- output = new WriteEntity(p, getWriteType(), false);
+ output = new WriteEntity(p, getWriteType(dest), false);
+ output.setDynamicPartitionWrite(true);
outputs.add(output);
} catch (HiveException e) {
throw new SemanticException(e.getMessage(), e);
@@ -6753,7 +6769,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
dest_part.isStoredAsSubDirectories(), conf);
AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID;
if (destTableIsAcid) {
- acidOp = getAcidType(table_desc.getOutputFileFormatClass());
+ acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
checkAcidConstraints(qb, table_desc, dest_tab);
}
ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp);
@@ -6762,9 +6778,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
ltd.setLbCtx(lbCtx);
loadTableWork.add(ltd);
-
if (!outputs.add(new WriteEntity(dest_part,
- determineWriteType(ltd, dest_tab.isNonNative())))) {
+ determineWriteType(ltd, dest_tab.isNonNative(), dest)))) {
+
throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
.getMsg(dest_tab.getTableName() + "@" + dest_part.getName()));
}
@@ -6925,7 +6941,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
ArrayList<ColumnInfo> vecCol = new ArrayList<ColumnInfo>();
- if (updating() || deleting()) {
+ if (updating(dest) || deleting(dest)) {
vecCol.add(new ColumnInfo(VirtualColumn.ROWID.getName(), VirtualColumn.ROWID.getTypeInfo(),
"", true));
} else {
@@ -6978,8 +6994,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// If this is an insert, update, or delete on an ACID table then mark that so the
// FileSinkOperator knows how to properly write to it.
if (destTableIsAcid) {
- AcidUtils.Operation wt = updating() ? AcidUtils.Operation.UPDATE :
- (deleting() ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
+ AcidUtils.Operation wt = updating(dest) ? AcidUtils.Operation.UPDATE :
+ (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT);
fileSinkDesc.setWriteType(wt);
acidFileSinks.add(fileSinkDesc);
}
@@ -7150,7 +7166,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
// The numbers of input columns and output columns should match for regular query
- if (!updating() && !deleting() && inColumnCnt != outColumnCnt) {
+ if (!updating(dest) && !deleting(dest) && inColumnCnt != outColumnCnt) {
String reason = "Table " + dest + " has " + outColumnCnt
+ " columns, but query has " + inColumnCnt + " columns.";
throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
@@ -7169,18 +7185,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
MetadataTypedColumnsetSerDe.class);
boolean isLazySimpleSerDe = table_desc.getDeserializerClass().equals(
LazySimpleSerDe.class);
- if (!isMetaDataSerDe && !deleting()) {
+ if (!isMetaDataSerDe && !deleting(dest)) {
// If we're updating, add the ROW__ID expression, then make the following column accesses
// offset by 1 so that we don't try to convert the ROW__ID
- if (updating()) {
+ if (updating(dest)) {
expressions.add(new ExprNodeColumnDesc(rowFields.get(0).getType(),
rowFields.get(0).getInternalName(), "", true));
}
// here only deals with non-partition columns. We deal with partition columns next
for (int i = 0; i < columnNumber; i++) {
- int rowFieldsOffset = updating() ? i + 1 : i;
+ int rowFieldsOffset = updating(dest) ? i + 1 : i;
ObjectInspector tableFieldOI = tableFields.get(i)
.getFieldObjectInspector();
TypeInfo tableFieldTypeInfo = TypeInfoUtils
@@ -7218,7 +7234,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// deal with dynamic partition columns: convert ExprNodeDesc type to String??
if (dynPart && dpCtx != null && dpCtx.getNumDPCols() > 0) {
// DP columns starts with tableFields.size()
- for (int i = tableFields.size() + (updating() ? 1 : 0); i < rowFields.size(); ++i) {
+ for (int i = tableFields.size() + (updating(dest) ? 1 : 0); i < rowFields.size(); ++i) {
TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
ExprNodeDesc column = new ExprNodeColumnDesc(
rowFieldTypeInfo, rowFields.get(i).getInternalName(), "", true);
@@ -10525,7 +10541,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
/**
- * Planner specific stuff goen in here.
+ * Planner specific stuff goes in here.
*/
static class PlannerContext {
protected ASTNode child;
@@ -13045,18 +13061,19 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable) {
+ private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable, String dest) {
// Don't know the characteristics of non-native tables,
// and don't have a rational way to guess, so assume the most
// conservative case.
if (isNonNativeTable) return WriteEntity.WriteType.INSERT_OVERWRITE;
- else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE : getWriteType());
- }
- private WriteEntity.WriteType getWriteType() {
- return updating() ? WriteEntity.WriteType.UPDATE :
- (deleting() ? WriteEntity.WriteType.DELETE : WriteEntity.WriteType.INSERT);
+ else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
+ getWriteType(dest));
}
+ private WriteEntity.WriteType getWriteType(String dest) {
+ return updating(dest) ? WriteEntity.WriteType.UPDATE :
+ (deleting(dest) ? WriteEntity.WriteType.DELETE : WriteEntity.WriteType.INSERT);
+ }
private boolean isAcidOutputFormat(Class<? extends OutputFormat> of) {
Class<?>[] interfaces = of.getInterfaces();
for (Class<?> iface : interfaces) {
@@ -13069,28 +13086,28 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
// Note that this method assumes you have already decided this is an Acid table. It cannot
// figure out if a table is Acid or not.
- private AcidUtils.Operation getAcidType() {
- return deleting() ? AcidUtils.Operation.DELETE :
- (updating() ? AcidUtils.Operation.UPDATE :
+ private AcidUtils.Operation getAcidType(String destination) {
+ return deleting(destination) ? AcidUtils.Operation.DELETE :
+ (updating(destination) ? AcidUtils.Operation.UPDATE :
AcidUtils.Operation.INSERT);
}
- private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of) {
+ private AcidUtils.Operation getAcidType(Class<? extends OutputFormat> of, String dest) {
if (SessionState.get() == null || !SessionState.get().getTxnMgr().supportsAcid()) {
return AcidUtils.Operation.NOT_ACID;
} else if (isAcidOutputFormat(of)) {
- return getAcidType();
+ return getAcidType(dest);
} else {
return AcidUtils.Operation.NOT_ACID;
}
}
- protected boolean updating() {
- return false;
+ protected boolean updating(String destination) {
+ return destination.startsWith(Context.DestClausePrefix.UPDATE.toString());
}
- protected boolean deleting() {
- return false;
+ protected boolean deleting(String destination) {
+ return destination.startsWith(Context.DestClausePrefix.DELETE.toString());
}
// Make sure the proper transaction manager that supports ACID is being used
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 4f0ead0..ed01a31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -296,6 +296,7 @@ public final class SemanticAnalyzerFactory {
case HiveParser.TOK_UPDATE_TABLE:
case HiveParser.TOK_DELETE_FROM:
+ case HiveParser.TOK_MERGE:
return new UpdateDeleteSemanticAnalyzer(queryState);
case HiveParser.TOK_START_TRANSACTION:
[2/3] hive git commit: HIVE-14943 Base Implementation (of HIVE-10924)
(Eugene Koifman, reviewed by Alan Gates)
Posted by ek...@apache.org.
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 5b874e4..55a3735 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hive.ql.parse;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -26,15 +28,16 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
@@ -46,7 +49,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
/**
* A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
- * update and delete statements. It works by rewriting the updates and deletes into insert
+ * update, delete and merge statements. It works by rewriting the updates and deletes into insert
* statements (since they are actually inserts) and then doing some patch up to make them work as
* updates and deletes instead.
*/
@@ -70,46 +73,249 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
switch (tree.getToken().getType()) {
case HiveParser.TOK_DELETE_FROM:
analyzeDelete(tree);
- return;
-
+ break;
case HiveParser.TOK_UPDATE_TABLE:
analyzeUpdate(tree);
- return;
-
+ break;
+ case HiveParser.TOK_MERGE:
+ analyzeMerge(tree);
+ break;
default:
throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
"UpdateDeleteSemanticAnalyzer");
}
+ cleanUpMetaColumnAccessControl();
+
}
}
-
- @Override
- protected boolean updating() {
- return ctx.getAcidOperation() == AcidUtils.Operation.UPDATE;
+ private boolean updating() {
+ return currentOperation == Operation.UPDATE;
}
-
- @Override
- protected boolean deleting() {
- return ctx.getAcidOperation() == AcidUtils.Operation.DELETE;
+ private boolean deleting() {
+ return currentOperation == Operation.DELETE;
}
private void analyzeUpdate(ASTNode tree) throws SemanticException {
- ctx.setAcidOperation(AcidUtils.Operation.UPDATE);
+ currentOperation = Operation.UPDATE;
reparseAndSuperAnalyze(tree);
}
private void analyzeDelete(ASTNode tree) throws SemanticException {
- ctx.setAcidOperation(AcidUtils.Operation.DELETE);
+ currentOperation = Operation.DELETE;
reparseAndSuperAnalyze(tree);
}
+ /**
+ * Append list of partition columns to Insert statement, i.e. the 1st set of partCol1,partCol2
+ * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2...
+ */
+ private void addPartitionColsToInsert(List<FieldSchema> partCols, StringBuilder rewrittenQueryStr) {
+ // If the table is partitioned we have to put the partition() clause in
+ if (partCols != null && partCols.size() > 0) {
+ rewrittenQueryStr.append(" partition (");
+ boolean first = true;
+ for (FieldSchema fschema : partCols) {
+ if (first)
+ first = false;
+ else
+ rewrittenQueryStr.append(", ");
+ //would be nice if there was a way to determine if quotes are needed
+ rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
+ }
+ rewrittenQueryStr.append(")");
+ }
+ }
+ /**
+ * Append list of partition columns to Insert statement, i.e. the 2nd set of partCol1,partCol2
+ * INSERT INTO T PARTITION(partCol1,partCol2...) SELECT col1, ... partCol1,partCol2...
+ * @param targetName simple target table name (i.e. name or alias)
+ */
+ private void addPartitionColsToSelect(List<FieldSchema> partCols, StringBuilder rewrittenQueryStr, String targetName) {
+ // If the table is partitioned, we need to select the partition columns as well.
+ if (partCols != null) {
+ for (FieldSchema fschema : partCols) {
+ rewrittenQueryStr.append(", ");
+ //would be nice if there was a way to determine if quotes are needed
+ if(targetName != null) {
+ rewrittenQueryStr.append(HiveUtils.unparseIdentifier(targetName, this.conf)).append('.');
+ }
+ rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
+ }
+ }
+ }
+ /**
+ * Assert that we are not asked to update a bucketing column or partition column
+ * @param colName it's the A in "SET A = B"
+ */
+ private void checkValidSetClauseTarget(ASTNode colName, List<FieldSchema> partCols,
+ List<String> bucketingCols) throws SemanticException {
+ String columnName = normalizeColName(colName.getText());
+ // Make sure this isn't one of the partitioning columns, that's not supported.
+ if (partCols != null) {
+ for (FieldSchema fschema : partCols) {
+ if (fschema.getName().equalsIgnoreCase(columnName)) {
+ throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
+ }
+ }
+ }
+ //updating bucket column should move row from one file to another - not supported
+ if(bucketingCols != null && bucketingCols.contains(columnName)) {
+ throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE,columnName);
+ }
+ }
+ private ASTNode findLHSofAssignment(ASTNode assignment) {
+ assert assignment.getToken().getType() == HiveParser.EQUAL :
+ "Expected set assignments to use equals operator but found " + assignment.getName();
+ ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
+ assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
+ "Expected left side of assignment to be table or column";
+ ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
+ assert colName.getToken().getType() == HiveParser.Identifier :
+ "Expected column name";
+ return colName;
+ }
+ private Map<String, ASTNode> collectSetColumnsAndExpressions(
+ ASTNode setClause,List<FieldSchema> partCols, List<String> bucketingCols, Set<String> setRCols)
+ throws SemanticException {
+ // An update needs to select all of the columns, as we rewrite the entire row. Also,
+ // we need to figure out which columns we are going to replace.
+ assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
+ "Expected second child of update token to be set token";
+
+ // Get the children of the set clause, each of which should be a column assignment
+ List<? extends Node> assignments = setClause.getChildren();
+ // Must be deterministic order map for consistent q-test output across Java versions
+ Map<String, ASTNode> setCols = new LinkedHashMap<String, ASTNode>(assignments.size());
+ for (Node a : assignments) {
+ ASTNode assignment = (ASTNode)a;
+ ASTNode colName = findLHSofAssignment(assignment);
+ if(setRCols != null) {
+ addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols);
+ }
+ checkValidSetClauseTarget(colName, partCols, bucketingCols);
+
+ String columnName = normalizeColName(colName.getText());
+ // This means that in UPDATE T SET x = _something_
+ // _something_ can be whatever is supported in SELECT _something_
+ setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
+ }
+ return setCols;
+ }
+ /**
+ * @return the Metastore representation of the target table
+ */
+ private Table getTargetTable(ASTNode tabRef) throws SemanticException {
+ String[] tableName;
+ Table mTable;
+ switch (tabRef.getType()) {
+ case HiveParser.TOK_TABREF:
+ tableName = getQualifiedTableName((ASTNode) tabRef.getChild(0));
+ break;
+ case HiveParser.TOK_TABNAME:
+ tableName = getQualifiedTableName(tabRef);
+ break;
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME", tabRef);
+ }
+ try {
+ mTable = db.getTable(tableName[0], tableName[1]);
+ } catch (InvalidTableException e) {
+ LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
+ + e.getMessage());
+ throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(getDotName(tableName)), e);
+ } catch (HiveException e) {
+ LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
+ + e.getMessage());
+ throw new SemanticException(e.getMessage(), e);
+ }
+ return mTable;
+ }
+ // Walk through all our inputs and set them to note that this read is part of an update or a
+ // delete.
+ private void markReadEntityForUpdate() {
+ for (ReadEntity input : inputs) {
+ if(isWritten(input)) {
+ //todo: this is actually not adding anything since LockComponent uses a Trie to "promote" a lock
+ //except by accident - when we have a partitioned target table we have a ReadEntity and WriteEntity
+ //for the table, so we mark ReadEntity and then delete WriteEntity (replace with Partition entries)
+ //so DbTxnManager skips Read lock on the ReadEntity....
+ input.setUpdateOrDelete(true);//input.noLockNeeded()?
+ }
+ }
+ }
+ /**
+ * For updates, we need to set the column access info so that it contains information on
+ * the columns we are updating.
+ * (But not all the columns of the target table even though the rewritten query writes
+ * all columns of target table since that is an implmentation detail)
+ */
+ private void setUpAccessControlInfoForUpdate(Table mTable, Map<String, ASTNode> setCols) {
+ ColumnAccessInfo cai = new ColumnAccessInfo();
+ for (String colName : setCols.keySet()) {
+ cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
+ }
+ setUpdateColumnAccessInfo(cai);
+ }
+ /**
+ * We need to weed ROW__ID out of the input column info, as it doesn't make any sense to
+ * require the user to have authorization on that column.
+ */
+ private void cleanUpMetaColumnAccessControl() {
+ //we do this for Update/Delete (incl Merge) because we introduce this column into the query
+ //as part of rewrite
+ if (columnAccessInfo != null) {
+ columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID);
+ }
+ }
+ /**
+ * Parse the newly generated SQL statment to get a new AST
+ */
+ private ReparseResult parseRewrittenQuery(StringBuilder rewrittenQueryStr, String originalQuery) throws SemanticException {
+ // Parse the rewritten query string
+ Context rewrittenCtx;
+ try {
+ // Set dynamic partitioning to nonstrict so that queries do not need any partition
+ // references.
+ // todo: this may be a perf issue as it prevents the optimizer.. or not
+ HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ rewrittenCtx = new Context(conf);
+ rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
+ }
+ rewrittenCtx.setCmd(rewrittenQueryStr.toString());
+
+ ParseDriver pd = new ParseDriver();
+ ASTNode rewrittenTree;
+ try {
+ LOG.info("Going to reparse <" + originalQuery + "> as \n<" + rewrittenQueryStr.toString() + ">");
+ rewrittenTree = pd.parse(rewrittenQueryStr.toString(), rewrittenCtx);
+ rewrittenTree = ParseUtils.findRootNonNullToken(rewrittenTree);
+
+ } catch (ParseException e) {
+ throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
+ }
+ return new ReparseResult(rewrittenTree, rewrittenCtx);
+ }
+ /**
+ * Assert it supports Acid write
+ */
+ private void validateTargetTable(Table mTable) throws SemanticException {
+ if (mTable.getTableType() == TableType.VIRTUAL_VIEW ||
+ mTable.getTableType() == TableType.MATERIALIZED_VIEW) {
+ LOG.error("Table " + getDotName(new String[] {mTable.getDbName(), mTable.getTableName()}) + " is a view or materialized view");
+ throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg());
+ }
+ }
+ /**
+ * This supports update and delete statements
+ */
private void reparseAndSuperAnalyze(ASTNode tree) throws SemanticException {
List<? extends Node> children = tree.getChildren();
// The first child should be the table we are deleting from
ASTNode tabName = (ASTNode)children.get(0);
assert tabName.getToken().getType() == HiveParser.TOK_TABNAME :
"Expected tablename as first child of " + operation() + " but found " + tabName.getName();
- String[] tableName = getQualifiedTableName(tabName);
// Rewrite the delete or update into an insert. Crazy, but it works as deletes and update
// actually are inserts into the delta file in Hive. A delete
@@ -129,98 +335,31 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
// merge on read.
StringBuilder rewrittenQueryStr = new StringBuilder();
- Table mTable;
- try {
- mTable = db.getTable(tableName[0], tableName[1]);
- } catch (InvalidTableException e) {
- LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
- + e.getMessage());
- throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(getDotName(tableName)), e);
- } catch (HiveException e) {
- LOG.error("Failed to find table " + getDotName(tableName) + " got exception "
- + e.getMessage());
- throw new SemanticException(e.getMessage(), e);
- }
-
- if (mTable.getTableType() == TableType.VIRTUAL_VIEW ||
- mTable.getTableType() == TableType.MATERIALIZED_VIEW) {
- LOG.error("Table " + getDotName(tableName) + " is a view or materialized view");
- throw new SemanticException(ErrorMsg.UPDATE_DELETE_VIEW.getMsg());
- }
+ Table mTable = getTargetTable(tabName);
+ validateTargetTable(mTable);
List<FieldSchema> partCols = mTable.getPartCols();
List<String> bucketingCols = mTable.getBucketCols();
rewrittenQueryStr.append("insert into table ");
- rewrittenQueryStr.append(getDotName(new String[] {
- HiveUtils.unparseIdentifier(tableName[0], this.conf),
- HiveUtils.unparseIdentifier(tableName[1], this.conf) }));
+ rewrittenQueryStr.append(getFullTableNameForSQL(tabName));
- // If the table is partitioned we have to put the partition() clause in
- if (partCols != null && partCols.size() > 0) {
- rewrittenQueryStr.append(" partition (");
- boolean first = true;
- for (FieldSchema fschema : partCols) {
- if (first)
- first = false;
- else
- rewrittenQueryStr.append(", ");
- rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
- }
- rewrittenQueryStr.append(")");
- }
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
rewrittenQueryStr.append(" select ROW__ID");
+
Map<Integer, ASTNode> setColExprs = null;
Map<String, ASTNode> setCols = null;
// Must be deterministic order set for consistent q-test output across Java versions
Set<String> setRCols = new LinkedHashSet<String>();
if (updating()) {
- // An update needs to select all of the columns, as we rewrite the entire row. Also,
- // we need to figure out which columns we are going to replace. We won't write the set
+ // We won't write the set
// expressions in the rewritten query. We'll patch that up later.
// The set list from update should be the second child (index 1)
assert children.size() >= 2 : "Expected update token to have at least two children";
ASTNode setClause = (ASTNode)children.get(1);
- assert setClause.getToken().getType() == HiveParser.TOK_SET_COLUMNS_CLAUSE :
- "Expected second child of update token to be set token";
-
- // Get the children of the set clause, each of which should be a column assignment
- List<? extends Node> assignments = setClause.getChildren();
- // Must be deterministic order map for consistent q-test output across Java versions
- setCols = new LinkedHashMap<String, ASTNode>(assignments.size());
- setColExprs = new HashMap<Integer, ASTNode>(assignments.size());
- for (Node a : assignments) {
- ASTNode assignment = (ASTNode)a;
- assert assignment.getToken().getType() == HiveParser.EQUAL :
- "Expected set assignments to use equals operator but found " + assignment.getName();
- ASTNode tableOrColTok = (ASTNode)assignment.getChildren().get(0);
- assert tableOrColTok.getToken().getType() == HiveParser.TOK_TABLE_OR_COL :
- "Expected left side of assignment to be table or column";
- ASTNode colName = (ASTNode)tableOrColTok.getChildren().get(0);
- assert colName.getToken().getType() == HiveParser.Identifier :
- "Expected column name";
-
- addSetRCols((ASTNode) assignment.getChildren().get(1), setRCols);
-
- String columnName = normalizeColName(colName.getText());
-
- // Make sure this isn't one of the partitioning columns, that's not supported.
- if (partCols != null) {
- for (FieldSchema fschema : partCols) {
- if (fschema.getName().equalsIgnoreCase(columnName)) {
- throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_PART_VALUE.getMsg());
- }
- }
- }
- //updating bucket column should move row from one file to another - not supported
- if(bucketingCols != null && bucketingCols.contains(columnName)) {
- throw new SemanticException(ErrorMsg.UPDATE_CANNOT_UPDATE_BUCKET_VALUE,columnName);
- }
- // This means that in UPDATE T SET x = _something_
- // _something_ can be whatever is supported in SELECT _something_
- setCols.put(columnName, (ASTNode)assignment.getChildren().get(1));
- }
+ setCols = collectSetColumnsAndExpressions(setClause, partCols, bucketingCols, setRCols);
+ setColExprs = new HashMap<Integer, ASTNode>(setClause.getChildCount());
List<FieldSchema> nonPartCols = mTable.getCols();
for (int i = 0; i < nonPartCols.size(); i++) {
@@ -237,17 +376,9 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
}
}
- // If the table is partitioned, we need to select the partition columns as well.
- if (partCols != null) {
- for (FieldSchema fschema : partCols) {
- rewrittenQueryStr.append(", ");
- rewrittenQueryStr.append(HiveUtils.unparseIdentifier(fschema.getName(), this.conf));
- }
- }
+ addPartitionColsToSelect(partCols, rewrittenQueryStr, null);
rewrittenQueryStr.append(" from ");
- rewrittenQueryStr.append(getDotName(new String[] {
- HiveUtils.unparseIdentifier(tableName[0], this.conf),
- HiveUtils.unparseIdentifier(tableName[1], this.conf) }));
+ rewrittenQueryStr.append(getFullTableNameForSQL(tabName));
ASTNode where = null;
int whereIndex = deleting() ? 1 : 2;
@@ -260,35 +391,21 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
// Add a sort by clause so that the row ids come out in the correct order
rewrittenQueryStr.append(" sort by ROW__ID ");
- // Parse the rewritten query string
- Context rewrittenCtx;
- try {
- // Set dynamic partitioning to nonstrict so that queries do not need any partition
- // references.
- HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
- rewrittenCtx = new Context(conf);
- rewrittenCtx.setExplainConfig(ctx.getExplainConfig());
- } catch (IOException e) {
- throw new SemanticException(ErrorMsg.UPDATEDELETE_IO_ERROR.getMsg());
- }
- rewrittenCtx.setCmd(rewrittenQueryStr.toString());
- rewrittenCtx.setAcidOperation(ctx.getAcidOperation());
-
- ParseDriver pd = new ParseDriver();
- ASTNode rewrittenTree;
- try {
- LOG.info("Going to reparse " + operation() + " as <" + rewrittenQueryStr.toString() + ">");
- rewrittenTree = pd.parse(rewrittenQueryStr.toString(), rewrittenCtx);
- rewrittenTree = ParseUtils.findRootNonNullToken(rewrittenTree);
-
- } catch (ParseException e) {
- throw new SemanticException(ErrorMsg.UPDATEDELETE_PARSE_ERROR.getMsg(), e);
- }
+ ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ ASTNode rewrittenTree = rr.rewrittenTree;
ASTNode rewrittenInsert = (ASTNode)rewrittenTree.getChildren().get(1);
assert rewrittenInsert.getToken().getType() == HiveParser.TOK_INSERT :
"Expected TOK_INSERT as second child of TOK_QUERY but found " + rewrittenInsert.getName();
+ if(updating()) {
+ rewrittenCtx.addDestNamePrefix(rewrittenInsert, Context.DestClausePrefix.UPDATE);
+ }
+ else if(deleting()) {
+ rewrittenCtx.addDestNamePrefix(rewrittenInsert, Context.DestClausePrefix.DELETE);
+ }
+
if (where != null) {
// The structure of the AST for the rewritten insert statement is:
// TOK_QUERY -> TOK_FROM
@@ -334,42 +451,38 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
useSuper = false;
}
- // Walk through all our inputs and set them to note that this read is part of an update or a
- // delete.
- for (ReadEntity input : inputs) {
- if(isWritten(input)) {
- input.setUpdateOrDelete(true);
- }
- }
+ markReadEntityForUpdate();
if (inputIsPartitioned(inputs)) {
+ //todo: there are bugs here: https://issues.apache.org/jira/browse/HIVE-15048
// In order to avoid locking the entire write table we need to replace the single WriteEntity
// with a WriteEntity for each partition
+ assert outputs.size() == 1 : "expected 1 WriteEntity. Got " + outputs;//this asserts comment above
+ WriteEntity original = null;
+ for(WriteEntity we : outputs) {
+ original = we;
+ }
outputs.clear();
for (ReadEntity input : inputs) {
+ /**
+ * The assumption here is that SemanticAnalyzer will will generate ReadEntity for each
+ * partition that exists and is matched by the WHERE clause (which may be all of them).
+ * Since we don't allow updating the value of a partition column, we know that we always
+ * write the same (or fewer) partitions than we read. Still, the write is a Dynamic
+ * Partition write - see HIVE-15032.
+ */
if (input.getTyp() == Entity.Type.PARTITION) {
WriteEntity.WriteType writeType = deleting() ? WriteEntity.WriteType.DELETE :
WriteEntity.WriteType.UPDATE;
- outputs.add(new WriteEntity(input.getPartition(), writeType));
+ WriteEntity we = new WriteEntity(input.getPartition(), writeType);
+ we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
+ outputs.add(we);
}
}
- } else {
- // We still need to patch up the WriteEntities as they will have an insert type. Change
- // them to the appropriate type for our operation.
- for (WriteEntity output : outputs) {
- output.setWriteType(deleting() ? WriteEntity.WriteType.DELETE :
- WriteEntity.WriteType.UPDATE);
- }
}
- // For updates, we need to set the column access info so that it contains information on
- // the columns we are updating.
if (updating()) {
- ColumnAccessInfo cai = new ColumnAccessInfo();
- for (String colName : setCols.keySet()) {
- cai.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
- }
- setUpdateColumnAccessInfo(cai);
+ setUpAccessControlInfoForUpdate(mTable, setCols);
// Add the setRCols to the input list
for (String colName : setRCols) {
@@ -379,14 +492,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
}
}
}
-
- // We need to weed ROW__ID out of the input column info, as it doesn't make any sense to
- // require the user to have authorization on that column.
- if (columnAccessInfo != null) {
- columnAccessInfo.stripVirtualColumn(VirtualColumn.ROWID);
- }
}
-
/**
* Check that {@code readEntity} is also being written
*/
@@ -400,10 +506,11 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
return false;
}
private String operation() {
- if (updating()) return "update";
- else if (deleting()) return "delete";
- else throw new IllegalStateException("UpdateDeleteSemanticAnalyzer neither updating nor " +
- "deleting, operation not known.");
+ if (currentOperation == Operation.NOT_ACID) {
+ throw new IllegalStateException("UpdateDeleteSemanticAnalyzer neither updating nor " +
+ "deleting, operation not known.");
+ }
+ return currentOperation.toString();
}
private boolean inputIsPartitioned(Set<ReadEntity> inputs) {
@@ -417,7 +524,7 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
return false;
}
- // This method find any columns on the right side of a set statement (thus rcols) and puts them
+ // This method finds any columns on the right side of a set statement (thus rcols) and puts them
// in a set so we can add them to the list of input cols to check.
private void addSetRCols(ASTNode node, Set<String> setRCols) {
@@ -443,4 +550,566 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
private static String normalizeColName(String colName) {
return colName.toLowerCase();
}
+
+ //todo: see SubQueryDiagnostic for some ideas on turning ASTNode into SQL
+ //todo: should we add MERGE to AcidUtils.Operation instead? that will be a lot of code clean up
+ private enum Operation {UPDATE, DELETE, MERGE, NOT_ACID};
+ private Operation currentOperation = Operation.NOT_ACID;
+ private static final String Indent = " ";
+
+ /**
+ * Here we take a Merge statement AST and generate a semantically equivalent multi-insert
+ * statement to exectue. Each Insert leg represents a single WHEN clause. As much as possible,
+ * the new SQL statement is made to look like the input SQL statement so that it's easier to map
+ * Query Compiler errors from generated SQL to original one this way.
+ * The generated SQL is a complete representation of the original input for the same reason.
+ * In many places SemanticAnalyzer throws exceptions that contain (line, position) coordinates.
+ * If generated SQL doesn't have everything and is patched up later, these coordinates point to
+ * the wrong place.
+ *
+ * @throws SemanticException
+ */
+ private void analyzeMerge(ASTNode tree) throws SemanticException {
+ currentOperation = Operation.MERGE;
+ /*
+ * See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST
+ For example, given:
+ merge into acidTbl using nonAcidPart2 source ON acidTbl.a = source.a2
+ WHEN MATCHED THEN UPDATE set b = source.b2
+ WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)
+
+ We get AST like this:
+ "(tok_merge " +
+ "(tok_tabname acidtbl) (tok_tabref (tok_tabname nonacidpart2) source) " +
+ "(= (. (tok_table_or_col acidtbl) a) (. (tok_table_or_col source) a2)) " +
+ "(tok_matched " +
+ "(tok_update " +
+ "(tok_set_columns_clause (= (tok_table_or_col b) (. (tok_table_or_col source) b2))))) " +
+ "(tok_not_matched " +
+ "tok_insert " +
+ "(tok_value_row (. (tok_table_or_col source) a2) (. (tok_table_or_col source) b2))))");
+
+ And need to produce a multi-insert like this to execute:
+ FROM acidTbl right outer join nonAcidPart2 ON acidTbl.a = source.a2
+ Insert into table acidTbl select nonAcidPart2.a2, nonAcidPart2.b2 where acidTbl.a is null
+ INSERT INTO TABLE acidTbl select target.ROW__ID, nonAcidPart2.a2, nonAcidPart2.b2 where nonAcidPart2.a2=acidTbl.a sort by acidTbl.ROW__ID
+ */
+ /*todo: we need some sort of validation phase over original AST to make things user friendly; for example, if
+ original command refers to a column that doesn't exist, this will be caught when processing the rewritten query but
+ the errors will point at locations that the user can't map to anything
+ - VALUES clause must have the same number of values as target table (including partition cols). Part cols go last in Select clause of Insert as Select
+ todo: do we care to preserve comments in original SQL?
+ todo: check if identifiers are propertly escaped/quoted in the generated SQL - it's currently inconsistent
+ Look at UnparseTranslator.addIdentifierTranslation() - it does unescape + unparse...
+ todo: consider "WHEN NOT MATCHED BY SOURCE THEN UPDATE SET TargetTable.Col1 = SourceTable.Col1 "; what happens hwen source is empty? This should be a runtime error - maybe not
+ the outer side of ROJ is empty => the join produces 0 rows. If supporting WHEN NOT MATCHED BY SOURCE, then this should be a runtime error
+ */
+ ASTNode target = (ASTNode)tree.getChild(0);
+ ASTNode source = (ASTNode)tree.getChild(1);
+ String targetName = getSimpleTableName(target);
+ String sourceName = getSimpleTableName(source);
+ ASTNode onClause = (ASTNode) tree.getChild(2);
+
+ Table targetTable = getTargetTable(target);
+ validateTargetTable(targetTable);
+ List<ASTNode> whenClauses = findWhenClauses(tree);
+
+ StringBuilder rewrittenQueryStr = new StringBuilder("FROM\n");
+ rewrittenQueryStr.append(Indent).append(getFullTableNameForSQL(target));
+ if(isAliased(target)) {
+ rewrittenQueryStr.append(" ").append(targetName);
+ }
+ rewrittenQueryStr.append('\n');
+ rewrittenQueryStr.append(Indent).append(chooseJoinType(whenClauses)).append("\n");
+ if(source.getType() == HiveParser.TOK_SUBQUERY) {
+ //this includes the mandatory alias
+ rewrittenQueryStr.append(Indent).append(source.getMatchedText());
+ }
+ else {
+ rewrittenQueryStr.append(Indent).append(getFullTableNameForSQL(source));
+ if(isAliased(source)) {
+ rewrittenQueryStr.append(" ").append(sourceName);
+ }
+ }
+ rewrittenQueryStr.append('\n');
+ rewrittenQueryStr.append(Indent).append("ON ").append(onClause.getMatchedText()).append('\n');
+
+ /**
+ * We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete
+ * If we have both update and delete, the 1st one (in SQL code) must have "AND <extra predicate>"
+ * so that the 2nd can ensure not to process the same rows.
+ * Update and Delete may be in any order. (Insert is always last)
+ */
+ String extraPredicate = null;
+ int numWhenMatchedUpdateClauses = 0, numWhenMatchedDeleteClauses = 0;
+ for(ASTNode whenClause : whenClauses) {
+ switch (getWhenClauseOperation(whenClause).getType()) {
+ case HiveParser.TOK_INSERT:
+ handleInsert(whenClause, rewrittenQueryStr, target, onClause, targetTable, targetName);
+ break;
+ case HiveParser.TOK_UPDATE:
+ numWhenMatchedUpdateClauses++;
+ String s = handleUpdate(whenClause, rewrittenQueryStr, target, onClause.getMatchedText(), targetTable, extraPredicate);
+ if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
+ extraPredicate = s;//i.e. it's the 1st WHEN MATCHED
+ }
+ break;
+ case HiveParser.TOK_DELETE:
+ numWhenMatchedDeleteClauses++;
+ String s1 = handleDelete(whenClause, rewrittenQueryStr, target, onClause.getMatchedText(), targetTable, extraPredicate);
+ if(numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
+ extraPredicate = s1;//i.e. it's the 1st WHEN MATCHED
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unexpected WHEN clause type: " + whenClause.getType() +
+ addParseInfo(whenClause));
+ }
+ if(numWhenMatchedDeleteClauses > 1) {
+ throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_DELETE, ctx.getCmd());
+ }
+ if(numWhenMatchedUpdateClauses > 1) {
+ throw new SemanticException(ErrorMsg.MERGE_TOO_MANY_UPDATE, ctx.getCmd());
+ }
+ }
+ if(numWhenMatchedDeleteClauses + numWhenMatchedUpdateClauses == 2 && extraPredicate == null) {
+ throw new SemanticException(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ctx.getCmd());
+ }
+
+ ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ ASTNode rewrittenTree = rr.rewrittenTree;
+
+ //set dest name mapping on new context
+ for(int insClauseIdx = 1, whenClauseIdx = 0; insClauseIdx < rewrittenTree.getChildCount(); insClauseIdx++, whenClauseIdx++) {
+ //we've added Insert clauses in order or WHEN items in whenClauses
+ ASTNode insertClause = (ASTNode) rewrittenTree.getChild(insClauseIdx);
+ switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) {
+ case HiveParser.TOK_INSERT:
+ rewrittenCtx.addDestNamePrefix(insertClause, Context.DestClausePrefix.INSERT);
+ break;
+ case HiveParser.TOK_UPDATE:
+ rewrittenCtx.addDestNamePrefix(insertClause, Context.DestClausePrefix.UPDATE);
+ break;
+ case HiveParser.TOK_DELETE:
+ rewrittenCtx.addDestNamePrefix(insertClause, Context.DestClausePrefix.DELETE);
+ break;
+ default:
+ assert false;
+ }
+ }
+ try {
+ useSuper = true;
+ super.analyze(rewrittenTree, rewrittenCtx);
+ } finally {
+ useSuper = false;
+ }
+
+ markReadEntityForUpdate();
+
+ if(targetTable.isPartitioned()) {
+ List<ReadEntity> partitionsRead = getRestrictedPartitionSet(targetTable);
+ if(!partitionsRead.isEmpty()) {
+ //if there is WriteEntity with WriteType=UPDATE/DELETE for target table, replace it with
+ //WriteEntity for each partition
+ List<WriteEntity> toRemove = new ArrayList<>();
+ for(WriteEntity we : outputs) {
+ WriteEntity.WriteType wt = we.getWriteType();
+ if(isTargetTable(we, targetTable) &&
+ (wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) {
+ toRemove.add(we);
+ }
+ }
+ outputs.removeAll(toRemove);
+ for(ReadEntity re : partitionsRead) {
+ for(WriteEntity original : toRemove) {
+ //since we may have both Update and Delete branches, Auth needs to know
+ WriteEntity we = new WriteEntity(re.getPartition(), original.getWriteType());
+ we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
+ outputs.add(we);
+ }
+ }
+ }
+ }
+ }
+ /**
+ * If the optimizer has determined that it only has to read some of the partitions of the
+ * target table to satisfy the query, then we know that the write side of update/delete
+ * (and update/delete parts of merge)
+ * can only write (at most) that set of partitions (since we currently don't allow updating
+ * partition (or bucket) columns). So we want to replace the table level
+ * WriteEntity in the outputs with WriteEntity for each of these partitions
+ * ToDo: see if this should be moved to SemanticAnalyzer itself since it applies to any
+ * insert which does a select against the same table. Then SemanticAnalyzer would also
+ * be able to not use DP for the Insert...
+ *
+ * Note that the Insert of Merge may be creating new partitions and writing to partitions
+ * which were not read (WHEN NOT MATCHED...)
+ */
+ private List<ReadEntity> getRestrictedPartitionSet(Table targetTable) {
+ List<ReadEntity> partitionsRead = new ArrayList<>();
+ for(ReadEntity re : inputs) {
+ if(re.isFromTopLevelQuery && re.getType() == Entity.Type.PARTITION && isTargetTable(re, targetTable)) {
+ partitionsRead.add(re);
+ }
+ }
+ return partitionsRead;
+ }
+ /**
+ * if there is no WHEN NOT MATCHED THEN INSERT, we don't outer join
+ */
+ private String chooseJoinType(List<ASTNode> whenClauses) {
+ for(ASTNode whenClause : whenClauses) {
+ if(getWhenClauseOperation(whenClause).getType() == HiveParser.TOK_INSERT) {
+ return "RIGHT OUTER JOIN";
+ }
+ }
+ return "INNER JOIN";
+ }
+ /**
+ * does this Entity belong to target table (partition)
+ */
+ private boolean isTargetTable(Entity entity, Table targetTable) {
+ //todo: https://issues.apache.org/jira/browse/HIVE-15048
+ /**
+ * is this the right way to compare? Should it just compare paths?
+ * equals() impl looks heavy weight
+ */
+ return targetTable.equals(entity.getTable());
+ }
+ /**
+ * @param onClauseAsString - because there is no clone() and we need to use in multiple places
+ * @param deleteExtraPredicate - see notes at caller
+ */
+ private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr,
+ ASTNode target, String onClauseAsString, Table targetTable,
+ String deleteExtraPredicate) throws SemanticException {
+ assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED;
+ assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE;
+ List<FieldSchema> partCols = targetTable.getPartCols();
+ List<String> bucketingCols = targetTable.getBucketCols();
+ String targetName = getSimpleTableName(target);
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
+ rewrittenQueryStr.append("\n select ").append(targetName).append(".ROW__ID");
+
+ ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0);
+ //columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions
+ //before reparsing, i.e. they are known to SemanticAnalyzer logic
+ Map<String, ASTNode> setColsExprs = collectSetColumnsAndExpressions(setClause, partCols, bucketingCols, null);
+ //if target table has cols c1,c2,c3 and p1 partition col and we had "SET c2 = 5, c1 = current_date()" we want to end up with
+ //insert into target (p1) select current_date(), 5, c3, p1 where ....
+ //since we take the RHS of set exactly as it was in Input, we don't need to deal with quoting/escaping column/table names
+ List<FieldSchema> nonPartCols = targetTable.getCols();
+ for(FieldSchema fs : nonPartCols) {
+ rewrittenQueryStr.append(", ");
+ String name = fs.getName();
+ if (setColsExprs.containsKey(name)) {
+ rewrittenQueryStr.append(setColsExprs.get(name).getMatchedText());
+ }
+ else {
+ //todo: is this the right way to get <table>.<colum> for target?
+ rewrittenQueryStr.append(getSimpleTableName(target)).append(".").append(HiveUtils.unparseIdentifier(name, this.conf));
+ }
+ }
+ addPartitionColsToSelect(partCols, rewrittenQueryStr, targetName);
+ rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString);
+ String extraPredicate = getWhenClausePredicate(whenMatchedUpdateClause);
+ if(extraPredicate != null) {
+ //we have WHEN MATCHED AND <boolean expr> THEN DELETE
+ rewrittenQueryStr.append(" AND ").append(extraPredicate);
+ }
+ if(deleteExtraPredicate != null) {
+ rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")");
+ }
+ rewrittenQueryStr.append("\n sort by ");
+ rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+
+ setUpAccessControlInfoForUpdate(targetTable, setColsExprs);
+ //we don't deal with columns on RHS of SET expression since the whole expr is part of the
+ //rewritten SQL statement and is thus handled by SemanticAnalzyer. Nor do we have to
+ //figure which cols on RHS are from source and which from target
+
+ return extraPredicate;
+ }
+ /**
+ * @param onClauseAsString - because there is no clone() and we need to use in multiple places
+ * @param updateExtraPredicate - see notes at caller
+ */
+ private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr, ASTNode target,
+ String onClauseAsString, Table targetTable, String updateExtraPredicate) throws SemanticException {
+ assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED;
+ assert getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
+ List<FieldSchema> partCols = targetTable.getPartCols();
+ String targetName = getSimpleTableName(target);
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
+
+ rewrittenQueryStr.append("\n select ").append(targetName).append(".ROW__ID ");
+ addPartitionColsToSelect(partCols, rewrittenQueryStr, targetName);
+ rewrittenQueryStr.append("\n WHERE ").append(onClauseAsString);
+ String extraPredicate = getWhenClausePredicate(whenMatchedDeleteClause);
+ if(extraPredicate != null) {
+ //we have WHEN MATCHED AND <boolean expr> THEN DELETE
+ rewrittenQueryStr.append(" AND ").append(extraPredicate);
+ }
+ if(updateExtraPredicate != null) {
+ rewrittenQueryStr.append(" AND NOT(").append(updateExtraPredicate).append(")");
+ }
+ rewrittenQueryStr.append("\n sort by ");
+ rewrittenQueryStr.append(targetName).append(".ROW__ID \n");
+ return extraPredicate;
+ }
+ private static String addParseInfo(ASTNode n) {
+ return " at " + ErrorMsg.renderPosition(n);
+ }
+
+ /**
+ * Returns the table name to use in the generated query preserving original quotes/escapes if any
+ * @see #getFullTableNameForSQL(ASTNode)
+ */
+ private String getSimpleTableName(ASTNode n) throws SemanticException {
+ return HiveUtils.unparseIdentifier(getSimpleTableNameBase(n), this.conf);
+ }
+ private String getSimpleTableNameBase(ASTNode n) throws SemanticException {
+ switch (n.getType()) {
+ case HiveParser.TOK_TABREF:
+ int aliasIndex = findTabRefIdxs(n)[0];
+ if (aliasIndex != 0) {
+ return n.getChild(aliasIndex).getText();//the alias
+ }
+ return getSimpleTableNameBase((ASTNode) n.getChild(0));
+ case HiveParser.TOK_TABNAME:
+ if(n.getChildCount() == 2) {
+ //db.table -> return table
+ return n.getChild(1).getText();
+ }
+ return n.getChild(0).getText();
+ case HiveParser.TOK_SUBQUERY:
+ return n.getChild(1).getText();//the alias
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME|TOK_SUBQUERY", n);
+ }
+ }
+ /**
+ * @return table name in db.table form with proper quoting/escaping to be used in a SQL statement
+ */
+ private String getFullTableNameForSQL(ASTNode n) throws SemanticException {
+ switch (n.getType()) {
+ case HiveParser.TOK_TABNAME:
+ String[] tableName = getQualifiedTableName(n);
+ return getDotName(new String[] {
+ HiveUtils.unparseIdentifier(tableName[0], this.conf),
+ HiveUtils.unparseIdentifier(tableName[1], this.conf) });
+ case HiveParser.TOK_TABREF:
+ return getFullTableNameForSQL((ASTNode) n.getChild(0));
+ default:
+ throw raiseWrongType("TOK_TABNAME", n);
+ }
+ } private static final class ReparseResult {
+ private final ASTNode rewrittenTree;
+ private final Context rewrittenCtx;
+ ReparseResult(ASTNode n, Context c) {
+ rewrittenTree = n;
+ rewrittenCtx = c;
+ }
+ }
+ private static IllegalArgumentException raiseWrongType(String expectedTokName, ASTNode n) {
+ return new IllegalArgumentException("Expected " + expectedTokName + "; got " + n.getType());
+ }
+ private boolean isAliased(ASTNode n) {
+ switch (n.getType()) {
+ case HiveParser.TOK_TABREF:
+ return findTabRefIdxs(n)[0] != 0;
+ case HiveParser.TOK_TABNAME:
+ return false;
+ case HiveParser.TOK_SUBQUERY:
+ assert n.getChildCount() > 1 : "Expected Derived Table to be aliased";
+ return true;
+ default:
+ throw raiseWrongType("TOK_TABREF|TOK_TABNAME", n);
+ }
+ }
+ /**
+ * Collect WHEN clauses from Merge statement AST
+ */
+ private List<ASTNode> findWhenClauses(ASTNode tree) throws SemanticException {
+ assert tree.getType() == HiveParser.TOK_MERGE;
+ List<ASTNode> whenClauses = new ArrayList<>();
+ for(int idx = 3; idx < tree.getChildCount(); idx++) {
+ ASTNode whenClause = (ASTNode)tree.getChild(idx);
+ assert whenClause.getType() == HiveParser.TOK_MATCHED ||
+ whenClause.getType() == HiveParser.TOK_NOT_MATCHED :
+ "Unexpected node type found: " + whenClause.getType() + addParseInfo(whenClause);
+ whenClauses.add(whenClause);
+ }
+ if(whenClauses.size() <= 0) {
+ //Futureproofing: the parser will actually not allow this
+ throw new SemanticException("Must have at least 1 WHEN clause in MERGE statement");
+ }
+ return whenClauses;
+ }
+ private ASTNode getWhenClauseOperation(ASTNode whenClause) {
+ if(!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
+ throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
+ }
+ return (ASTNode) whenClause.getChild(0);
+ }
+ /**
+ * returns the <boolean predicate> as in WHEN MATCHED AND <boolean predicate> THEN...
+ * @return may be null
+ */
+ private String getWhenClausePredicate(ASTNode whenClause) {
+ if(!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
+ throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
+ }
+ if(whenClause.getChildCount() == 2) {
+ return ((ASTNode)whenClause.getChild(1)).getMatchedText();
+ }
+ return null;
+ }
+ /**
+ * Generates the Insert leg of the multi-insert SQL to represent WHEN NOT MATCHED THEN INSERT clause
+ * @param targetTableNameInSourceQuery - simple name/alias
+ * @throws SemanticException
+ */
+ private void handleInsert(ASTNode whenNotMatchedClause, StringBuilder rewrittenQueryStr, ASTNode target,
+ ASTNode onClause, Table targetTable,
+ String targetTableNameInSourceQuery) throws SemanticException{
+ assert whenNotMatchedClause.getType() == HiveParser.TOK_NOT_MATCHED;
+ assert getWhenClauseOperation(whenNotMatchedClause).getType() == HiveParser.TOK_INSERT;
+ List<FieldSchema> partCols = targetTable.getPartCols();
+
+ String valuesClause = ((ASTNode)getWhenClauseOperation(whenNotMatchedClause).getChild(0))
+ .getMatchedText();
+ valuesClause = valuesClause.substring(1, valuesClause.length() - 1);
+ rewrittenQueryStr.append("INSERT INTO ").append(getFullTableNameForSQL(target));
+ addPartitionColsToInsert(partCols, rewrittenQueryStr);
+
+ OnClauseAnalyzer oca = new OnClauseAnalyzer(onClause, targetTable, targetTableNameInSourceQuery);
+ oca.analyze();
+ rewrittenQueryStr.append("\n select ")
+ .append(valuesClause).append("\n WHERE ").append(oca.getPredicate());
+ String extraPredicate = getWhenClausePredicate(whenNotMatchedClause);
+ if(extraPredicate != null) {
+ //we have WHEN NOT MATCHED AND <boolean expr> THEN INSERT
+ rewrittenQueryStr.append(" AND ")
+ .append(((ASTNode)whenNotMatchedClause.getChild(1)).getMatchedText()).append('\n');
+ }
+ }
+ /**
+ * Suppose the input Merge statement has ON target.a = source.b and c = d. Assume, that 'c' is from
+ * target table and 'd' is from source expression. In order to properly
+ * generate the Insert for WHEN NOT MATCHED THEN INSERT, we need to make sure that the Where
+ * clause of this Insert contains "target.a is null and target.c is null" This ensures that this
+ * Insert leg does not receive any rows that are processed by Insert corresponding to
+ * WHEN MATCHED THEN ... clauses. (Implicit in this is a mini resolver that figures out if an
+ * unqualified column is part of the target table. We can get away with this simple logic because
+ * we know that target is always a table (as opposed to some derived table).
+ * The job of this class is to generate this predicate.
+ *
+ * Note that is thi predicate cannot simply be NOT(on-clause-expr). IF on-clause-expr evaluates
+ * to Unknown, it will be treated as False in the WHEN MATCHED Inserts but NOT(Unknown) = Unknown,
+ * and so it will be False for WHEN NOT MATCHED Insert...
+ */
+ private static final class OnClauseAnalyzer {
+ private final ASTNode onClause;
+ private final Map<String, List<String>> table2column = new HashMap<>();
+ private final List<String> unresolvedColumns = new ArrayList<>();
+ private final List<FieldSchema> allTargetTableColumns = new ArrayList<>();
+ private final Set<String> tableNamesFound = new HashSet<>();
+ private final String targetTableNameInSourceQuery;
+ /**
+ * @param targetTableNameInSourceQuery alias or simple name
+ */
+ OnClauseAnalyzer(ASTNode onClause, Table targetTable, String targetTableNameInSourceQuery) {
+ this.onClause = onClause;
+ allTargetTableColumns.addAll(targetTable.getCols());
+ allTargetTableColumns.addAll(targetTable.getPartCols());
+ this.targetTableNameInSourceQuery = unescapeIdentifier(targetTableNameInSourceQuery);
+ }
+ /**
+ * finds all columns and groups by table ref (if there is one)
+ */
+ private void visit(ASTNode n) {
+ if(n.getType() == HiveParser.TOK_TABLE_OR_COL) {
+ ASTNode parent = (ASTNode) n.getParent();
+ if(parent != null && parent.getType() == HiveParser.DOT) {
+ //the ref must be a table, so look for column name as right child of DOT
+ if(parent.getParent() != null && parent.getParent().getType() == HiveParser.DOT) {
+ //I don't think this can happen... but just in case
+ throw new IllegalArgumentException("Found unexpected db.table.col reference in " + onClause.getMatchedText());
+ }
+ addColumn2Table(n.getChild(0).getText(), parent.getChild(1).getText());
+ }
+ else {
+ //must be just a column name
+ unresolvedColumns.add(n.getChild(0).getText());
+ }
+ }
+ if(n.getChildCount() == 0) {
+ return;
+ }
+ for(Node child : n.getChildren()) {
+ visit((ASTNode)child);
+ }
+ }
+ private void analyze() {
+ visit(onClause);
+ int numTableRefs = tableNamesFound.size();
+ if(tableNamesFound.size() > 2) {
+ throw new IllegalArgumentException("Found > 2 table refs in ON clause. Found " +
+ tableNamesFound + " in " + onClause.getMatchedText());
+ }
+ handleUnresolvedColumns();
+ if(tableNamesFound.size() > 2) {
+ throw new IllegalArgumentException("Found > 2 table refs in ON clause (incl unresolved). " +
+ "Found " + tableNamesFound + " in " + onClause.getMatchedText());
+ }
+ }
+ /**
+ * Find those that belong to target table
+ */
+ private void handleUnresolvedColumns() {
+ if(unresolvedColumns.isEmpty()) { return; }
+ for(String c : unresolvedColumns) {
+ for(FieldSchema fs : allTargetTableColumns) {
+ if(c.equalsIgnoreCase(fs.getName())) {
+ //c belongs to target table; strictly speaking there maybe an ambiguous ref but
+ //this will be caught later when multi-insert is parsed
+ addColumn2Table(targetTableNameInSourceQuery.toLowerCase(), c);
+ break;
+ }
+ }
+ }
+ }
+ private void addColumn2Table(String tableName, String columnName) {
+ tableName = tableName.toLowerCase();//normalize name for mapping
+ tableNamesFound.add(tableName);
+ List<String> cols = table2column.get(tableName);
+ if(cols == null) {
+ cols = new ArrayList<>();
+ table2column.put(tableName, cols);
+ }
+ //we want to preserve 'columnName' as it was in original input query so that rewrite
+ //looks as much as possible like original query
+ cols.add(columnName);
+ }
+ /**
+ * Now generate the predicate for Where clause
+ */
+ private String getPredicate() {
+ //normilize table name for mapping
+ List<String> targetCols = table2column.get(targetTableNameInSourceQuery.toLowerCase());
+ StringBuilder sb = new StringBuilder();
+ for(String col : targetCols) {
+ if(sb.length() > 0) {
+ sb.append(" AND ");
+ }
+ //but preserve table name in SQL
+ sb.append(targetTableNameInSourceQuery).append(".").append(col).append(" IS NULL");
+ }
+ return sb.toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
index eafba21..60858e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
@@ -516,7 +516,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
}
}
if (!found) {
- throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
+ throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(" \'" + bucketCol + "\'"));
}
}
}
@@ -536,7 +536,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
}
}
if (!found) {
- throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
+ throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(" \'" + sortCol + "\'"));
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index f513d0f..f8ae86b 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -417,6 +417,7 @@ public class TestCompactionTxnHandler {
long txnId = openTxns.getTxn_ids().get(0);
// lock a table, as in dynamic partitions
LockComponent lc = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, dbName);
+ lc.setIsDynamicPartitionWrite(true);
lc.setTablename(tableName);
DataOperationType dop = DataOperationType.UPDATE;
lc.setOperationType(dop);
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 64baa9f..68af15a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
@@ -580,4 +581,38 @@ public class TestTxnCommands {
runStatementOnDriver("ALTER TABLE ex2.exchange_part_test2 ADD PARTITION (ds='2013-04-05')");
runStatementOnDriver("ALTER TABLE ex1.exchange_part_test1 EXCHANGE PARTITION (ds='2013-04-05') WITH TABLE ex2.exchange_part_test2");
}
+ @Test
+ public void testMergeNegative() throws Exception {
+ CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO " + Table.ACIDTBL +
+ " target USING " + Table.NONACIDORCTBL +
+ " source\nON target.a = source.a " +
+ "\nWHEN MATCHED THEN UPDATE set b = 1 " +
+ "\nWHEN MATCHED THEN DELETE " +
+ "\nWHEN NOT MATCHED AND a < 1 THEN INSERT VALUES(1,2)");
+ Assert.assertEquals(ErrorMsg.MERGE_PREDIACTE_REQUIRED, ((HiveException)cpr.getException()).getCanonicalErrorMsg());
+ }
+ @Test
+ public void testMergeNegative2() throws Exception {
+ CommandProcessorResponse cpr = runStatementOnDriverNegative("MERGE INTO "+ Table.ACIDTBL +
+ " target USING " + Table.NONACIDORCTBL + "\n source ON target.pk = source.pk " +
+ "\nWHEN MATCHED THEN UPDATE set t = 1 " +
+ "\nWHEN MATCHED THEN UPDATE set b=a");
+ Assert.assertEquals(ErrorMsg.MERGE_TOO_MANY_UPDATE, ((HiveException)cpr.getException()).getCanonicalErrorMsg());
+ }
+ @Ignore
+ @Test
+ public void testSpecialChar() throws Exception {
+ String target = "`aci/d_u/ami`";
+ String src = "`src/name`";
+ runStatementOnDriver("drop table if exists " + target);
+ runStatementOnDriver("drop table if exists " + src);
+ runStatementOnDriver("create table " + target + "(i int," +
+ "`d?*de e` decimal(5,2)," +
+ "vc varchar(128)) clustered by (i) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table " + src + "(`g/h` int, j decimal(5,2), k varchar(128))");
+ runStatementOnDriver("merge into " + target + " as `d/8` using " + src + " as `a/b` on i=`g/h` " +
+ "\nwhen matched and i > 5 then delete " +
+ "\nwhen matched then update set vc=`\u2206\u220b` " +
+ "\nwhen not matched then insert values(`a/b`.`g/h`,`a/b`.j,`a/b`.k)");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 949e071..49ba667 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -64,12 +64,15 @@ import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* TODO: this should be merged with TestTxnCommands once that is checked in
* specifically the tests; the supporting code here is just a clone of TestTxnCommands
*/
public class TestTxnCommands2 {
+ static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands2.class);
protected static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnCommands2.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
@@ -86,7 +89,9 @@ public class TestTxnCommands2 {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart"),
NONACIDORCTBL("nonAcidOrcTbl"),
- NONACIDPART("nonAcidPart");
+ NONACIDPART("nonAcidPart"),
+ NONACIDPART2("nonAcidPart2"),
+ ACIDNESTEDPART("acidNestedPart");
private final String name;
@Override
@@ -126,11 +131,17 @@ public class TestTxnCommands2 {
}
SessionState.start(new SessionState(hiveConf));
d = new Driver(hiveConf);
+ d.setMaxRows(10000);
dropTables();
runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create table " + Table.NONACIDPART + "(a int, b int) partitioned by (p string) stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table " + Table.NONACIDPART2 +
+ "(a2 int, b2 int) partitioned by (p2 string) stored as orc TBLPROPERTIES ('transactional'='false')");
+ runStatementOnDriver("create table " + Table.ACIDNESTEDPART +
+ "(a int, b int) partitioned by (p int, q int) clustered by (a) into " + BUCKET_COUNT +
+ " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
}
protected void dropTables() throws Exception {
@@ -1333,6 +1344,409 @@ public class TestTxnCommands2 {
String[] expectedResult = { "1\tfoo\tNULL", "2\tbar\tNULL" };
Assert.assertEquals(Arrays.asList(expectedResult), rs);
}
+ /**
+ * Test that ACID works with multi-insert statement
+ */
+ @Test
+ public void testMultiInsertStatement() throws Exception {
+ int[][] sourceValsOdd = {{5,5},{11,11}};
+ int[][] sourceValsEven = {{2,2}};
+ //populate source
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(sourceValsOdd));
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(sourceValsEven));
+ int[][] targetValsOdd = {{5,6},{7,8}};
+ int[][] targetValsEven = {{2,1},{4,3}};
+ //populate target
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " PARTITION(p='odd') " + makeValuesClause(targetValsOdd));
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " PARTITION(p='even') " + makeValuesClause(targetValsEven));
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " order by a,b");
+ int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
+ Assert.assertEquals(stringifyValues(targetVals), r);
+ //currently multi-insrt doesn't allow same table/partition in > 1 output branch
+ String s = "from " + Table.ACIDTBLPART + " target right outer join " +
+ Table.NONACIDPART2 + " source on target.a = source.a2 " +
+ " INSERT INTO TABLE " + Table.ACIDTBLPART + " PARTITION(p='even') select source.a2, source.b2 where source.a2=target.a " +
+ " insert into table " + Table.ACIDTBLPART + " PARTITION(p='odd') select source.a2,source.b2 where target.a is null";
+ //r = runStatementOnDriver("explain formatted " + s);
+ //LOG.info("Explain formatted: " + r.toString());
+ runStatementOnDriver(s);
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " where p='even' order by a,b");
+ int[][] rExpected = {{2,1},{2,2},{4,3},{5,5}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBLPART + " where p='odd' order by a,b");
+ int[][] rExpected2 = {{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected2), r);
+ }
+ /**
+ * check that we can specify insert columns
+ *
+ * Need to figure out semantics: what if a row from base expr ends up in both Update and Delete clauses we'll write
+ * Update event to 1 delta and Delete to another. Given that we collapse events for same current txn for different stmt ids
+ * to the latest one, delete will win.
+ * In Acid 2.0 we'll end up with 2 Delete events for the same PK. Logically should be OK, but may break Vectorized reader impl.... need to check
+ *
+ * 1:M from target to source results in ambiguous write to target - SQL Standard expects an error. (I have an argument on how
+ * to solve this with minor mods to Join operator written down somewhere)
+ *
+ * Only need 1 Stats task for MERGE (currently we get 1 per branch).
+ * Should also eliminate Move task - that's a general ACID task
+ */
+ private void logResuts(List<String> r, String header, String prefix) {
+ LOG.info(prefix + " " + header);
+ StringBuilder sb = new StringBuilder();
+ int numLines = 0;
+ for(String line : r) {
+ numLines++;
+ sb.append(prefix).append(line).append("\n");
+ }
+ LOG.info(sb.toString());
+ LOG.info(prefix + " Printed " + numLines + " lines");
+ }
+
+
+ /**
+ * This tests that we handle non-trivial ON clause correctly
+ * @throws Exception
+ */
+ @Test
+ public void testMerge() throws Exception {
+ int[][] baseValsOdd = {{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals(stringifyValues(vals), r);
+ String query = "merge into " + Table.ACIDTBL +
+ " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = a2 and b + 1 = source.b2 + 1 " +
+ "WHEN MATCHED THEN UPDATE set b = source.b2 " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2)";
+ runStatementOnDriver(query);
+
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{2,1},{4,3},{5,5},{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+ @Test
+ public void testMergeWithPredicate() throws Exception {
+ int[][] baseValsOdd = {{2,2},{5,5},{8,8},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals(stringifyValues(vals), r);
+ String query = "merge into " + Table.ACIDTBL +
+ " t using " + Table.NONACIDPART2 + " s ON t.a = s.a2 " +
+ "WHEN MATCHED AND t.b between 1 and 3 THEN UPDATE set b = s.b2 " +
+ "WHEN NOT MATCHED and s.b2 >= 11 THEN INSERT VALUES(s.a2, s.b2)";
+ runStatementOnDriver(query);
+
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{2,2},{4,3},{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+
+ /**
+ * Test combines update + insert clauses
+ * @throws Exception
+ */
+ @Test
+ public void testMerge2() throws Exception {
+ int[][] baseValsOdd = {{5,5},{11,11}};
+ int[][] baseValsEven = {{2,2},{4,44}};
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals(stringifyValues(vals), r);
+ String query = "merge into " + Table.ACIDTBL +
+ " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " +
+ "WHEN MATCHED THEN UPDATE set b = source.b2 " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) ";//AND b < 1
+ r = runStatementOnDriver(query);
+ //r = runStatementOnDriver("explain " + query);
+ //logResuts(r, "Explain logical1", "");
+
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{2,2},{4,44},{5,5},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+
+ /**
+ * test combines delete + insert clauses
+ * @throws Exception
+ */
+ @Test
+ public void testMerge3() throws Exception {
+ int[][] baseValsOdd = {{5,5},{11,11}};
+ int[][] baseValsEven = {{2,2},{4,44}};
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='odd') " + makeValuesClause(baseValsOdd));
+ runStatementOnDriver("insert into " + Table.NONACIDPART2 + " PARTITION(p2='even') " + makeValuesClause(baseValsEven));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ Assert.assertEquals(stringifyValues(vals), r);
+ String query = "merge into " + Table.ACIDTBL +
+ " using " + Table.NONACIDPART2 + " source ON " + Table.ACIDTBL + ".a = source.a2 " +
+ "WHEN MATCHED THEN DELETE " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(source.a2, source.b2) ";
+ runStatementOnDriver(query);
+
+ r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+ /**
+ * https://hortonworks.jira.com/browse/BUG-66580
+ * @throws Exception
+ */
+ @Ignore
+ @Test
+ public void testMultiInsert() throws Exception {
+ runStatementOnDriver("create table if not exists srcpart (a int, b int, c int) " +
+ "partitioned by (z int) clustered by (a) into 2 buckets " +
+ "stored as orc tblproperties('transactional'='true')");
+ runStatementOnDriver("create temporary table if not exists data1 (x int)");
+// runStatementOnDriver("create temporary table if not exists data2 (x int)");
+
+ runStatementOnDriver("insert into data1 values (1),(2),(3)");
+// runStatementOnDriver("insert into data2 values (4),(5),(6)");
+ d.destroy();
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ d = new Driver(hiveConf);
+ List<String> r = runStatementOnDriver(" from data1 " +
+ "insert into srcpart partition(z) select 0,0,1,x " +
+ "insert into srcpart partition(z=1) select 0,0,1");
+ }
+ /**
+ * Investigating DP and WriteEntity, etc
+ * @throws Exception
+ */
+ @Test
+ @Ignore
+ public void testDynamicPartitions() throws Exception {
+ d.destroy();
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ //In DbTxnManager.acquireLocks() we have
+ // 1 ReadEntity: default@values__tmp__table__1
+ // 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')");
+
+ List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
+ Assert.assertEquals("4", r1.get(0));
+ //In DbTxnManager.acquireLocks() we have
+ // 2 ReadEntity: [default@acidtblpart@p=p1, default@acidtblpart]
+ // 1 WriteEntity: default@acidtblpart Type=TABLE WriteType=INSERT isDP=false
+ //todo: side note on the above: LockRequestBuilder combines the both default@acidtblpart entries to 1
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) select * from " + Table.ACIDTBLPART + " where p='p1'");
+
+ //In DbTxnManager.acquireLocks() we have
+ // 2 ReadEntity: [default@acidtblpart@p=p1, default@acidtblpart]
+ // 1 WriteEntity: default@acidtblpart@p=p2 Type=PARTITION WriteType=INSERT isDP=false
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p='p2') select a,b from " + Table.ACIDTBLPART + " where p='p1'");
+
+ //In UpdateDeleteSemanticAnalyzer, after super analyze
+ // 3 ReadEntity: [default@acidtblpart, default@acidtblpart@p=p1, default@acidtblpart@p=p2]
+ // 1 WriteEntity: [default@acidtblpart TABLE/INSERT]
+ //after UDSA
+ // Read [default@acidtblpart, default@acidtblpart@p=p1, default@acidtblpart@p=p2]
+ // Write [default@acidtblpart@p=p1, default@acidtblpart@p=p2] - PARTITION/UPDATE, PARTITION/UPDATE
+ //todo: Why acquire per partition locks - if you have many partitions that's hugely inefficient.
+ //could acquire 1 table level Shared_write intead
+ runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 1");
+
+ //In UpdateDeleteSemanticAnalyzer, after super analyze
+ // Read [default@acidtblpart, default@acidtblpart@p=p1]
+ // Write default@acidtblpart TABLE/INSERT
+ //after UDSA
+ // Read [default@acidtblpart, default@acidtblpart@p=p1]
+ // Write [default@acidtblpart@p=p1] PARTITION/UPDATE
+ //todo: this causes a Read lock on the whole table - clearly overkill
+ //for Update/Delete we always write exactly (at most actually) the partitions we read
+ runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 1 where p='p1'");
+ }
+ @Test
+ public void testDynamicPartitionsMerge() throws Exception {
+ d.destroy();
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1'),(3,3,'p1'),(4,4,'p2')");
+
+ List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
+ Assert.assertEquals("4", r1.get(0));
+ int[][] sourceVals = {{2,15},{4,44},{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
+ runStatementOnDriver("merge into " + Table.ACIDTBLPART + " using " + Table.NONACIDORCTBL +
+ " as s ON " + Table.ACIDTBLPART + ".a = s.a " +
+ "when matched then update set b = s.b " +
+ "when not matched then insert values(s.a, s.b, 'new part')");
+ r1 = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
+ String result= r1.toString();
+ Assert.assertEquals("[new part\t5\t5, new part\t11\t11, p1\t1\t1, p1\t2\t15, p1\t3\t3, p2\t4\t44]", result);
+ }
+ /**
+ * Using nested partitions and thus DummyPartition
+ * @throws Exception
+ */
+ @Test
+ public void testDynamicPartitionsMerge2() throws Exception {
+ d.destroy();
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ int[][] targetVals = {{1,1,1},{2,2,2},{3,3,1},{4,4,2}};
+ runStatementOnDriver("insert into " + Table.ACIDNESTEDPART + " partition(p=1,q) " + makeValuesClause(targetVals));
+
+ List<String> r1 = runStatementOnDriver("select count(*) from " + Table.ACIDNESTEDPART);
+ Assert.assertEquals("4", r1.get(0));
+ int[][] sourceVals = {{2,15},{4,44},{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
+ runStatementOnDriver("merge into " + Table.ACIDNESTEDPART + " using " + Table.NONACIDORCTBL +
+ " as s ON " + Table.ACIDNESTEDPART + ".a = s.a " +
+ "when matched then update set b = s.b " +
+ "when not matched then insert values(s.a, s.b, 3,4)");
+ r1 = runStatementOnDriver("select p,q,a,b from " + Table.ACIDNESTEDPART + " order by p,q, a, b");
+ Assert.assertEquals(stringifyValues(new int[][] {{1,1,1,1},{1,1,3,3},{1,2,2,15},{1,2,4,44},{3,4,5,5},{3,4,11,11}}), r1);
+ }
+ @Ignore("Covered elsewhere")
+ @Test
+ public void testMergeAliasedTarget() throws Exception {
+ int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ String query = "merge into " + Table.ACIDTBL +
+ " as target using " + Table.NONACIDORCTBL + " source ON target.a = source.a " +
+ "WHEN MATCHED THEN update set b = 0 " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(source.a, source.b) ";
+ runStatementOnDriver(query);
+
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{2,0},{4,0},{5,0},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+ @Test
+ public void testMergeUpdateDelete() throws Exception {
+ int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(baseValsOdd));
+ int[][] vals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(vals));
+ String query = "merge into " + Table.ACIDTBL +
+ " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
+ "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
+ "WHEN MATCHED and t.a > 3 and t.a < 5 THEN DELETE " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+ runStatementOnDriver(query);
+
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{2,0},{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+ @Test
+ public void testMergeDeleteUpdate() throws Exception {
+ int[][] sourceVals = {{2,2},{4,44},{5,5},{11,11}};
+ runStatementOnDriver("insert into " + Table.NONACIDORCTBL + " " + makeValuesClause(sourceVals));
+ int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
+ String query = "merge into " + Table.ACIDTBL +
+ " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
+ "WHEN MATCHED and s.a < 5 THEN DELETE " +
+ "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+ runStatementOnDriver(query);
+
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
+
+ /**
+ * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy
+ */
+ @Test
+ public void testMergeType2SCD01() throws Exception {
+ runStatementOnDriver("drop table if exists target");
+ runStatementOnDriver("drop table if exists source");
+ runStatementOnDriver("drop table if exists splitTable");
+
+ runStatementOnDriver("create table splitTable(op int)");
+ runStatementOnDriver("insert into splitTable values (0),(1)");
+ runStatementOnDriver("create table source (key int, data int)");
+ runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}};
+ runStatementOnDriver("insert into target " + makeValuesClause(targetVals));
+ int[][] sourceVals = {{1, 7}, {3, 8}};
+ runStatementOnDriver("insert into source " + makeValuesClause(sourceVals));
+ //augment source with a col which has 1 if it will cause an update in target, 0 otherwise
+ String curMatch = "select s.*, case when t.cur is null then 0 else 1 end m from source s left outer join (select * from target where target.cur=1) t on s.key=t.key";
+ //split each row (duplicate) which will cause an update into 2 rows and augment with 'op' col which has 0 to insert, 1 to update
+ String teeCurMatch = "select curMatch.*, case when splitTable.op is null or splitTable.op = 0 then 0 else 1 end op from (" + curMatch + ") curMatch left outer join splitTable on curMatch.m=1";
+ if(false) {
+ //this is just for debug
+ List<String> r1 = runStatementOnDriver(curMatch);
+ List<String> r2 = runStatementOnDriver(teeCurMatch);
+ }
+ String stmt = "merge into target t using (" + teeCurMatch + ") s on t.key=s.key and t.cur=1 and s.op=1 " +
+ "when matched then update set cur=0 " +
+ "when not matched then insert values(s.key,s.data,1)";
+
+ runStatementOnDriver(stmt);
+ int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
+ List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
+ Assert.assertEquals(stringifyValues(resultVals), r);
+ }
+ /**
+ * https://www.linkedin.com/pulse/how-load-slowly-changing-dimension-type-2-using-oracle-padhy
+ * Same as testMergeType2SCD01 but with a more intuitive "source" expression
+ */
+ @Test
+ public void testMergeType2SCD02() throws Exception {
+ runStatementOnDriver("drop table if exists target");
+ runStatementOnDriver("drop table if exists source");
+ runStatementOnDriver("create table source (key int, data int)");
+ runStatementOnDriver("create table target (key int, data int, cur int) clustered by (key) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ int[][] targetVals = {{1, 5, 1}, {2, 6, 1}, {1, 18, 0}};
+ runStatementOnDriver("insert into target " + makeValuesClause(targetVals));
+ int[][] sourceVals = {{1, 7}, {3, 8}};
+ runStatementOnDriver("insert into source " + makeValuesClause(sourceVals));
+
+ String baseSrc = "select source.*, 0 c from source " +
+ "union all " +
+ "select source.*, 1 c from source " +
+ "inner join target " +
+ "on source.key=target.key where target.cur=1";
+ if(false) {
+ //this is just for debug
+ List<String> r1 = runStatementOnDriver(baseSrc);
+ List<String> r2 = runStatementOnDriver(
+ "select t.*, s.* from target t right outer join (" + baseSrc + ") s " +
+ "\non t.key=s.key and t.cur=s.c and t.cur=1");
+ }
+ String stmt = "merge into target t using " +
+ "(" + baseSrc + ") s " +
+ "on t.key=s.key and t.cur=s.c and t.cur=1 " +
+ "when matched then update set cur=0 " +
+ "when not matched then insert values(s.key,s.data,1)";
+
+ runStatementOnDriver(stmt);
+ int[][] resultVals = {{1,5,0},{1,7,1},{1,18,0},{2,6,1},{3,8,1}};
+ List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
+ Assert.assertEquals(stringifyValues(resultVals), r);
+ }
+
+ @Test
+ @Ignore("Values clause with table constructor not yet supported")
+ public void testValuesSource() throws Exception {
+ int[][] targetVals = {{2,1},{4,3},{5,6},{7,8}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + " " + makeValuesClause(targetVals));
+ String query = "merge into " + Table.ACIDTBL +
+ " as t using (select * from (values (2,2),(4,44),(5,5),(11,11)) as F(a,b)) s ON t.a = s.a " +
+ "WHEN MATCHED and s.a < 5 THEN DELETE " +
+ "WHEN MATCHED AND s.a < 3 THEN update set b = 0 " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+ runStatementOnDriver(query);
+
+ List<String> r = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " order by a,b");
+ int[][] rExpected = {{5,6},{7,8},{11,11}};
+ Assert.assertEquals(stringifyValues(rExpected), r);
+ }
/**
* takes raw data and turns it into a string as if from Driver.getResults()
@@ -1389,6 +1803,7 @@ public class TestTxnCommands2 {
}
protected List<String> runStatementOnDriver(String stmt) throws Exception {
+ LOG.info("+runStatementOnDriver(" + stmt + ")");
CommandProcessorResponse cpr = d.run(stmt);
if(cpr.getResponseCode() != 0) {
throw new RuntimeException(stmt + " failed: " + cpr);
http://git-wip-us.apache.org/repos/asf/hive/blob/e00f909d/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
index c2330cb..c4dead8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithSplitUpdate.java
@@ -18,21 +18,16 @@
package org.apache.hadoop.hive.ql;
-import java.io.File;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -545,4 +540,10 @@ public class TestTxnCommands2WithSplitUpdate extends TestTxnCommands2 {
resultCount = 2;
Assert.assertEquals(resultCount, Integer.parseInt(rs.get(0)));
}
+ @Test
+ @Ignore
+ public void testMergeType2SCD01() throws Exception {}
+ @Test
+ @Ignore
+ public void testMergeType2SCD02() throws Exception {}
}