You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/12/15 21:27:47 UTC
hive git commit: HIVE-15048 Update/Delete statement using wrong
WriteEntity when subqueries are involved (Eugene Koifman,
reviewed by Alan Gates)
Repository: hive
Updated Branches:
refs/heads/master 590687bc4 -> 08d4e124b
HIVE-15048 Update/Delete statement using wrong WriteEntity when subqueries are involved (Eugene Koifman, reviewed by Alan Gates)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/08d4e124
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/08d4e124
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/08d4e124
Branch: refs/heads/master
Commit: 08d4e124bc7685e22fa55d64893b7f9ca19156ab
Parents: 590687b
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Dec 15 13:20:24 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Dec 15 13:20:54 2016 -0800
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/hooks/ReadEntity.java | 6 +-
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 60 ++++-------
.../apache/hadoop/hive/ql/TestTxnCommands.java | 25 +++++
.../hive/ql/lockmgr/TestDbTxnManager2.java | 31 ------
.../test/queries/clientpositive/acid_subquery.q | 18 ++++
.../results/clientpositive/acid_subquery.q.out | 101 +++++++++++++++++++
6 files changed, 169 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
index b805904..9f52ebe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
@@ -53,7 +53,11 @@ public class ReadEntity extends Entity implements Serializable {
// important because in that case we shouldn't acquire a lock for it or authorize the read.
// These will be handled by the output to the table instead.
private boolean isUpdateOrDelete = false;
- //https://issues.apache.org/jira/browse/HIVE-15048
+ /**
+ * https://issues.apache.org/jira/browse/HIVE-15048
+ * It is possible that the same table is used in top level query and a sub-query, e.g.
+ * select * from T where T.c in (select c from T inner join S on T.a=S.b)
+ */
public transient boolean isFromTopLevelQuery = true;
http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 05e1f43..79355ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -455,35 +455,8 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
useSuper = false;
}
- markReadEntityForUpdate();
+ updateOutputs(mTable);
- if (inputIsPartitioned(inputs)) {
- //todo: there are bugs here: https://issues.apache.org/jira/browse/HIVE-15048
- // In order to avoid locking the entire write table we need to replace the single WriteEntity
- // with a WriteEntity for each partition
- assert outputs.size() == 1 : "expected 1 WriteEntity. Got " + outputs;//this asserts comment above
- WriteEntity original = null;
- for(WriteEntity we : outputs) {
- original = we;
- }
- outputs.clear();
- for (ReadEntity input : inputs) {
- /**
- * The assumption here is that SemanticAnalyzer will will generate ReadEntity for each
- * partition that exists and is matched by the WHERE clause (which may be all of them).
- * Since we don't allow updating the value of a partition column, we know that we always
- * write the same (or fewer) partitions than we read. Still, the write is a Dynamic
- * Partition write - see HIVE-15032.
- */
- if (input.getTyp() == Entity.Type.PARTITION) {
- WriteEntity.WriteType writeType = deleting() ? WriteEntity.WriteType.DELETE :
- WriteEntity.WriteType.UPDATE;
- WriteEntity we = new WriteEntity(input.getPartition(), writeType);
- we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
- outputs.add(we);
- }
- }
- }
if (updating()) {
setUpAccessControlInfoForUpdate(mTable, setCols);
@@ -517,17 +490,6 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
return currentOperation.toString();
}
- private boolean inputIsPartitioned(Set<ReadEntity> inputs) {
- // We cannot simply look at the first entry, as in the case where the input is partitioned
- // there will be a table entry as well. So look for at least one partition entry.
- for (ReadEntity re : inputs) {
- if (re.getTyp() == Entity.Type.PARTITION) {
- return true;
- }
- }
- return false;
- }
-
// This method finds any columns on the right side of a set statement (thus rcols) and puts them
// in a set so we can add them to the list of input cols to check.
private void addSetRCols(ASTNode node, Set<String> setRCols) {
@@ -757,7 +719,17 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
} finally {
useSuper = false;
}
+ updateOutputs(targetTable);
+ }
+ /**
+ * SemanticAnalyzer will generate a WriteEntity for the target table since it doesn't know/check
+ * if the read and write are of the same table in "insert ... select ....". Since DbTxnManager
+ * uses Read/WriteEntity objects to decide which locks to acquire, we get more concurrency if we
+ * have change the table WriteEntity to a set of partition WriteEntity objects based on
+ * ReadEntity objects computed for this table.
+ */
+ private void updateOutputs(Table targetTable) {
markReadEntityForUpdate();
if(targetTable.isPartitioned()) {
@@ -770,6 +742,13 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
WriteEntity.WriteType wt = we.getWriteType();
if(isTargetTable(we, targetTable) &&
(wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) {
+ /**
+ * The assumption here is that SemanticAnalyzer will will generate ReadEntity for each
+ * partition that exists and is matched by the WHERE clause (which may be all of them).
+ * Since we don't allow updating the value of a partition column, we know that we always
+ * write the same (or fewer) partitions than we read. Still, the write is a Dynamic
+ * Partition write - see HIVE-15032.
+ */
toRemove.add(we);
}
}
@@ -797,7 +776,8 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
* be able to not use DP for the Insert...
*
* Note that the Insert of Merge may be creating new partitions and writing to partitions
- * which were not read (WHEN NOT MATCHED...)
+ * which were not read (WHEN NOT MATCHED...). WriteEntity for that should be created
+ * in MoveTask (or some other task after the query is complete)
*/
private List<ReadEntity> getRestrictedPartitionSet(Table targetTable) {
List<ReadEntity> partitionsRead = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 1626e2e..9bfcc82 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.output.StringBuilderWriter;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -44,6 +45,8 @@ import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
@@ -63,6 +66,7 @@ import java.util.concurrent.TimeUnit;
* run with Acid 2.0 (see subclasses of TestTxnCommands2)
*/
public class TestTxnCommands {
+ static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class);
private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
File.separator + TestTxnCommands.class.getCanonicalName()
+ "-" + System.currentTimeMillis()
@@ -113,6 +117,7 @@ public class TestTxnCommands {
}
SessionState.start(new SessionState(hiveConf));
d = new Driver(hiveConf);
+ d.setMaxRows(10000);
dropTables();
runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
@@ -731,6 +736,26 @@ public class TestTxnCommands {
List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
Assert.assertEquals(stringifyValues(resultVals), r);
}
+
+ @Ignore("HIVE-14707")
+ @Test
+ public void testMergeInsertOnly() throws Exception {
+ String query = "merge into " + Table.ACIDTBL +
+ " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
+ "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+ d.destroy();
+ HiveConf hc = new HiveConf(hiveConf);
+ hc.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+ d = new Driver(hc);
+ d.setMaxRows(10000);
+
+ List<String> explain = runStatementOnDriver("explain " + query);
+ StringBuilder sb = new StringBuilder();
+ for(String s : explain) {
+ sb.append(s).append('\n');
+ }
+ LOG.info("Explain1: " + sb);
+ }
@Test
public void testMergeUpdateDelete() throws Exception {
int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};
http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 637a01a..3c9358d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -2098,35 +2098,4 @@ public class TestDbTxnManager2 {
TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2));
}
}
- //https://issues.apache.org/jira/browse/HIVE-15048
- @Test
- @Ignore("for some reason this fails with NPE in setUp() when run as part of the suite, but not standalone..")
- public void testUpdateWithSubquery() throws Exception {
- dropTable(new String[] {"target", "source"});
- checkCmdOnDriver(driver.run("create table target (a int, b int) " +
- "partitioned by (p int, q int) clustered by (a) into 2 buckets " +
- "stored as orc TBLPROPERTIES ('transactional'='true')"));
- checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
-
- checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)"));
-
- checkCmdOnDriver(driver.run(
-"update target set b = 1 where p in (select t.q1 from source t where t.a1=5)"));
-/**
- * So the above query fails with invalid reference 'p' (in subquery) (as as if u use t.p)
- * But before it fails, here is inputs/outpus before/after UpdateDeleteSemanticAnalyzer
-* Before UDSA
-* inputs: [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
-* outputs: [default@target]
-*
-* after UDSA
-* inputs: [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
-* outputs: [default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
-*
-* So it looks like....
-*/
- checkCmdOnDriver(driver.run(
- "update source set b1 = 1 where p1 in (select t.q from target t where t.p=2)"));
-
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/test/queries/clientpositive/acid_subquery.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/acid_subquery.q b/ql/src/test/queries/clientpositive/acid_subquery.q
new file mode 100644
index 0000000..ab87d4c
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/acid_subquery.q
@@ -0,0 +1,18 @@
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+drop table if exists target;
+drop table if exists source;
+
+create table target (a int, b int) partitioned by (p int, q int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true');
+create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true');
+insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2);
+
+-- the intent here is to record the set of ReadEntity and WriteEntity objects for these 2 update statements
+update target set b = 1 where p in (select t.q1 from source t where t.a1=5);
+
+update source set b1 = 1 where p1 in (select t.q from target t where t.p=2);
+
+-- the extra predicates in when matched clause match 1 partition
+merge into target t using source s on t.a = s.a1 when matched and p = 1 and q = 2 then update set b = 1 when matched and p = 2 and q = 2 then delete when not matched and a1 > 100 then insert values(s.a1,s.b1,s.p1, s.q1);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/test/results/clientpositive/acid_subquery.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/acid_subquery.q.out b/ql/src/test/results/clientpositive/acid_subquery.q.out
new file mode 100644
index 0000000..b977768
--- /dev/null
+++ b/ql/src/test/results/clientpositive/acid_subquery.q.out
@@ -0,0 +1,101 @@
+PREHOOK: query: drop table if exists target
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists target
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists source
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists source
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table target (a int, b int) partitioned by (p int, q int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@target
+POSTHOOK: query: create table target (a int, b int) partitioned by (p int, q int) clustered by (a) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@target
+PREHOOK: query: create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@source
+POSTHOOK: query: create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2 buckets stored as orc TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@source
+PREHOOK: query: insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)
+PREHOOK: type: QUERY
+PREHOOK: Output: default@target
+POSTHOOK: query: insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=3
+POSTHOOK: Output: default@target@p=2/q=2
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: -- the intent here is to record the set of ReadEntity and WriteEntity objects for these 2 update statements
+update target set b = 1 where p in (select t.q1 from source t where t.a1=5)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@source
+PREHOOK: Input: default@target
+PREHOOK: Input: default@target@p=1/q=2
+PREHOOK: Input: default@target@p=1/q=3
+PREHOOK: Input: default@target@p=2/q=2
+PREHOOK: Output: default@target@p=1/q=2
+PREHOOK: Output: default@target@p=1/q=3
+PREHOOK: Output: default@target@p=2/q=2
+POSTHOOK: query: -- the intent here is to record the set of ReadEntity and WriteEntity objects for these 2 update statements
+update target set b = 1 where p in (select t.q1 from source t where t.a1=5)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@source
+POSTHOOK: Input: default@target
+POSTHOOK: Input: default@target@p=1/q=2
+POSTHOOK: Input: default@target@p=1/q=3
+POSTHOOK: Input: default@target@p=2/q=2
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=3
+POSTHOOK: Output: default@target@p=2/q=2
+PREHOOK: query: update source set b1 = 1 where p1 in (select t.q from target t where t.p=2)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@source
+PREHOOK: Input: default@target
+PREHOOK: Input: default@target@p=2/q=2
+PREHOOK: Output: default@source
+POSTHOOK: query: update source set b1 = 1 where p1 in (select t.q from target t where t.p=2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@source
+POSTHOOK: Input: default@target
+POSTHOOK: Input: default@target@p=2/q=2
+POSTHOOK: Output: default@source
+PREHOOK: query: -- the extra predicates in when matched clause match 1 partition
+merge into target t using source s on t.a = s.a1 when matched and p = 1 and q = 2 then update set b = 1 when matched and p = 2 and q = 2 then delete when not matched and a1 > 100 then insert values(s.a1,s.b1,s.p1, s.q1)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@source
+PREHOOK: Input: default@target
+PREHOOK: Input: default@target@p=1/q=2
+PREHOOK: Input: default@target@p=1/q=3
+PREHOOK: Input: default@target@p=2/q=2
+PREHOOK: Output: default@target
+PREHOOK: Output: default@target@p=1/q=2
+PREHOOK: Output: default@target@p=1/q=2
+PREHOOK: Output: default@target@p=1/q=3
+PREHOOK: Output: default@target@p=1/q=3
+PREHOOK: Output: default@target@p=2/q=2
+PREHOOK: Output: default@target@p=2/q=2
+POSTHOOK: query: -- the extra predicates in when matched clause match 1 partition
+merge into target t using source s on t.a = s.a1 when matched and p = 1 and q = 2 then update set b = 1 when matched and p = 2 and q = 2 then delete when not matched and a1 > 100 then insert values(s.a1,s.b1,s.p1, s.q1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@source
+POSTHOOK: Input: default@target
+POSTHOOK: Input: default@target@p=1/q=2
+POSTHOOK: Input: default@target@p=1/q=3
+POSTHOOK: Input: default@target@p=2/q=2
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=3
+POSTHOOK: Output: default@target@p=1/q=3
+POSTHOOK: Output: default@target@p=2/q=2
+POSTHOOK: Output: default@target@p=2/q=2