You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/12/15 21:27:47 UTC

hive git commit: HIVE-15048 Update/Delete statement using wrong WriteEntity when subqueries are involved (Eugene Koifman, reviewed by Alan Gates)

Repository: hive
Updated Branches:
  refs/heads/master 590687bc4 -> 08d4e124b


HIVE-15048 Update/Delete statement using wrong WriteEntity when subqueries are involved (Eugene Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/08d4e124
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/08d4e124
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/08d4e124

Branch: refs/heads/master
Commit: 08d4e124bc7685e22fa55d64893b7f9ca19156ab
Parents: 590687b
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Thu Dec 15 13:20:24 2016 -0800
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Thu Dec 15 13:20:54 2016 -0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/hooks/ReadEntity.java |   6 +-
 .../ql/parse/UpdateDeleteSemanticAnalyzer.java  |  60 ++++-------
 .../apache/hadoop/hive/ql/TestTxnCommands.java  |  25 +++++
 .../hive/ql/lockmgr/TestDbTxnManager2.java      |  31 ------
 .../test/queries/clientpositive/acid_subquery.q |  18 ++++
 .../results/clientpositive/acid_subquery.q.out  | 101 +++++++++++++++++++
 6 files changed, 169 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
index b805904..9f52ebe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
@@ -53,7 +53,11 @@ public class ReadEntity extends Entity implements Serializable {
   // important because in that case we shouldn't acquire a lock for it or authorize the read.
   // These will be handled by the output to the table instead.
   private boolean isUpdateOrDelete = false;
-  //https://issues.apache.org/jira/browse/HIVE-15048
+  /**
+   * https://issues.apache.org/jira/browse/HIVE-15048
+   * It is possible that the same table is used in top level query and a sub-query, e.g.
+   * select * from T where T.c in (select c from T inner join S on T.a=S.b)
+   */
   public transient boolean isFromTopLevelQuery = true;
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 05e1f43..79355ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -455,35 +455,8 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
       useSuper = false;
     }
 
-    markReadEntityForUpdate();
+    updateOutputs(mTable);
 
-    if (inputIsPartitioned(inputs)) {
-      //todo: there are bugs here: https://issues.apache.org/jira/browse/HIVE-15048
-      // In order to avoid locking the entire write table we need to replace the single WriteEntity
-      // with a WriteEntity for each partition
-      assert outputs.size() == 1 : "expected 1 WriteEntity. Got " + outputs;//this asserts comment above
-      WriteEntity original = null;
-      for(WriteEntity we : outputs) {
-        original = we;
-      }
-      outputs.clear();
-      for (ReadEntity input : inputs) {
-        /**
-         * The assumption here is that SemanticAnalyzer will will generate ReadEntity for each
-         * partition that exists and is matched by the WHERE clause (which may be all of them).
-         * Since we don't allow updating the value of a partition column, we know that we always
-         * write the same (or fewer) partitions than we read.  Still, the write is a Dynamic
-         * Partition write - see HIVE-15032.
-         */
-        if (input.getTyp() == Entity.Type.PARTITION) {
-          WriteEntity.WriteType writeType = deleting() ? WriteEntity.WriteType.DELETE :
-              WriteEntity.WriteType.UPDATE;
-          WriteEntity we = new WriteEntity(input.getPartition(), writeType);
-          we.setDynamicPartitionWrite(original.isDynamicPartitionWrite());
-          outputs.add(we);
-        }
-      }
-    }
 
     if (updating()) {
       setUpAccessControlInfoForUpdate(mTable, setCols);
@@ -517,17 +490,6 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
     return currentOperation.toString();
   }
 
-  private boolean inputIsPartitioned(Set<ReadEntity> inputs) {
-    // We cannot simply look at the first entry, as in the case where the input is partitioned
-    // there will be a table entry as well.  So look for at least one partition entry.
-    for (ReadEntity re : inputs) {
-      if (re.getTyp() == Entity.Type.PARTITION) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   // This method finds any columns on the right side of a set statement (thus rcols) and puts them
   // in a set so we can add them to the list of input cols to check.
   private void addSetRCols(ASTNode node, Set<String> setRCols) {
@@ -757,7 +719,17 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
     } finally {
       useSuper = false;
     }
+    updateOutputs(targetTable);
+  }
 
+  /**
+   * SemanticAnalyzer will generate a WriteEntity for the target table since it doesn't know/check
+   * if the read and write are of the same table in "insert ... select ....".  Since DbTxnManager
+   * uses Read/WriteEntity objects to decide which locks to acquire, we get more concurrency if we
+   * have change the table WriteEntity to a set of partition WriteEntity objects based on
+   * ReadEntity objects computed for this table.
+   */
+  private void updateOutputs(Table targetTable) {
     markReadEntityForUpdate();
 
     if(targetTable.isPartitioned()) {
@@ -770,6 +742,13 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
           WriteEntity.WriteType wt = we.getWriteType();
           if(isTargetTable(we, targetTable) &&
             (wt == WriteEntity.WriteType.UPDATE || wt == WriteEntity.WriteType.DELETE)) {
+            /**
+             * The assumption here is that SemanticAnalyzer will will generate ReadEntity for each
+             * partition that exists and is matched by the WHERE clause (which may be all of them).
+             * Since we don't allow updating the value of a partition column, we know that we always
+             * write the same (or fewer) partitions than we read.  Still, the write is a Dynamic
+             * Partition write - see HIVE-15032.
+             */
             toRemove.add(we);
           }
         }
@@ -797,7 +776,8 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
    * be able to not use DP for the Insert...
    *
    * Note that the Insert of Merge may be creating new partitions and writing to partitions
-   * which were not read  (WHEN NOT MATCHED...)
+   * which were not read  (WHEN NOT MATCHED...).  WriteEntity for that should be created
+   * in MoveTask (or some other task after the query is complete)
    */
   private List<ReadEntity> getRestrictedPartitionSet(Table targetTable) {
     List<ReadEntity> partitionsRead = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 1626e2e..9bfcc82 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.output.StringBuilderWriter;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -44,6 +45,8 @@ import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -63,6 +66,7 @@ import java.util.concurrent.TimeUnit;
  * run with Acid 2.0 (see subclasses of TestTxnCommands2)
  */
 public class TestTxnCommands {
+  static final private Logger LOG = LoggerFactory.getLogger(TestTxnCommands.class);
   private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") +
     File.separator + TestTxnCommands.class.getCanonicalName()
     + "-" + System.currentTimeMillis()
@@ -113,6 +117,7 @@ public class TestTxnCommands {
     }
     SessionState.start(new SessionState(hiveConf));
     d = new Driver(hiveConf);
+    d.setMaxRows(10000);
     dropTables();
     runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
     runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
@@ -731,6 +736,26 @@ public class TestTxnCommands {
     List<String> r = runStatementOnDriver("select * from target order by key,data,cur");
     Assert.assertEquals(stringifyValues(resultVals), r);
   }
+  
+  @Ignore("HIVE-14707")
+  @Test
+  public void testMergeInsertOnly() throws Exception {
+    String query = "merge into " + Table.ACIDTBL +
+      " as t using " + Table.NONACIDORCTBL + " s ON t.a = s.a " +
+      "WHEN NOT MATCHED THEN INSERT VALUES(s.a, s.b) ";
+    d.destroy();
+    HiveConf hc = new HiveConf(hiveConf);
+    hc.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+    d = new Driver(hc);
+    d.setMaxRows(10000);
+
+    List<String> explain = runStatementOnDriver("explain " + query);
+    StringBuilder sb = new StringBuilder();
+    for(String s : explain) {
+      sb.append(s).append('\n');
+    }
+    LOG.info("Explain1: " + sb);
+  }
   @Test
   public void testMergeUpdateDelete() throws Exception {
     int[][] baseValsOdd = {{2,2},{4,44},{5,5},{11,11}};

http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 637a01a..3c9358d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -2098,35 +2098,4 @@ public class TestDbTxnManager2 {
         TxnDbUtil.countQueryAgent("select count(*) from WRITE_SET where ws_txnid=" + txnid2));
     }
   }
-  //https://issues.apache.org/jira/browse/HIVE-15048
-  @Test
-  @Ignore("for some reason this fails with NPE in setUp() when run as part of the suite, but not standalone..")
-  public void testUpdateWithSubquery() throws Exception {
-    dropTable(new String[] {"target", "source"});
-    checkCmdOnDriver(driver.run("create table target (a int, b int) " +
-      "partitioned by (p int, q int) clustered by (a) into 2  buckets " +
-      "stored as orc TBLPROPERTIES ('transactional'='true')"));
-    checkCmdOnDriver(driver.run("create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')"));
-
-    checkCmdOnDriver(driver.run("insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)"));
-    
-    checkCmdOnDriver(driver.run(
-"update target set b = 1 where p in (select t.q1 from source t where t.a1=5)"));
-/**
- * So the above query fails with invalid reference 'p' (in subquery)  (as as if u use t.p)
- * But before it fails, here is inputs/outpus before/after UpdateDeleteSemanticAnalyzer
-* Before UDSA
-* inputs:  [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
-* outputs: [default@target]
-* 
-* after UDSA
-* inputs:  [default@target, default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
-* outputs: [default@target@p=1/q=2, default@target@p=1/q=3, default@target@p=2/q=2]
-* 
-* So it looks like....
-*/
-    checkCmdOnDriver(driver.run(
-      "update source set b1 = 1 where p1 in (select t.q from target t where t.p=2)"));
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/test/queries/clientpositive/acid_subquery.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/acid_subquery.q b/ql/src/test/queries/clientpositive/acid_subquery.q
new file mode 100644
index 0000000..ab87d4c
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/acid_subquery.q
@@ -0,0 +1,18 @@
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+drop table if exists target;
+drop table if exists source;
+
+create table target (a int, b int) partitioned by (p int, q int) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true');
+create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true');
+insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2);
+
+-- the intent here is to record the set of ReadEntity and WriteEntity objects for these 2 update statements
+update target set b = 1 where p in (select t.q1 from source t where t.a1=5);
+
+update source set b1 = 1 where p1 in (select t.q from target t where t.p=2);
+
+-- the extra predicates in when matched clause match 1 partition
+merge into target t using source s on t.a = s.a1 when matched and p = 1 and q = 2 then update set b = 1 when matched and p = 2 and q = 2 then delete when not matched and a1 > 100 then insert values(s.a1,s.b1,s.p1, s.q1);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/08d4e124/ql/src/test/results/clientpositive/acid_subquery.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/acid_subquery.q.out b/ql/src/test/results/clientpositive/acid_subquery.q.out
new file mode 100644
index 0000000..b977768
--- /dev/null
+++ b/ql/src/test/results/clientpositive/acid_subquery.q.out
@@ -0,0 +1,101 @@
+PREHOOK: query: drop table if exists target
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists target
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists source
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists source
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create table target (a int, b int) partitioned by (p int, q int) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@target
+POSTHOOK: query: create table target (a int, b int) partitioned by (p int, q int) clustered by (a) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@target
+PREHOOK: query: create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@source
+POSTHOOK: query: create table source (a1 int, b1 int, p1 int, q1 int) clustered by (a1) into 2  buckets stored as orc TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@source
+PREHOOK: query: insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)
+PREHOOK: type: QUERY
+PREHOOK: Output: default@target
+POSTHOOK: query: insert into target partition(p,q) values (1,2,1,2), (3,4,1,2), (5,6,1,3), (7,8,2,2)
+POSTHOOK: type: QUERY
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=3
+POSTHOOK: Output: default@target@p=2/q=2
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=2).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: target PARTITION(p=1,q=3).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).a EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+POSTHOOK: Lineage: target PARTITION(p=2,q=2).b EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+PREHOOK: query: -- the intent here is to record the set of ReadEntity and WriteEntity objects for these 2 update statements
+update target set b = 1 where p in (select t.q1 from source t where t.a1=5)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@source
+PREHOOK: Input: default@target
+PREHOOK: Input: default@target@p=1/q=2
+PREHOOK: Input: default@target@p=1/q=3
+PREHOOK: Input: default@target@p=2/q=2
+PREHOOK: Output: default@target@p=1/q=2
+PREHOOK: Output: default@target@p=1/q=3
+PREHOOK: Output: default@target@p=2/q=2
+POSTHOOK: query: -- the intent here is to record the set of ReadEntity and WriteEntity objects for these 2 update statements
+update target set b = 1 where p in (select t.q1 from source t where t.a1=5)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@source
+POSTHOOK: Input: default@target
+POSTHOOK: Input: default@target@p=1/q=2
+POSTHOOK: Input: default@target@p=1/q=3
+POSTHOOK: Input: default@target@p=2/q=2
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=3
+POSTHOOK: Output: default@target@p=2/q=2
+PREHOOK: query: update source set b1 = 1 where p1 in (select t.q from target t where t.p=2)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@source
+PREHOOK: Input: default@target
+PREHOOK: Input: default@target@p=2/q=2
+PREHOOK: Output: default@source
+POSTHOOK: query: update source set b1 = 1 where p1 in (select t.q from target t where t.p=2)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@source
+POSTHOOK: Input: default@target
+POSTHOOK: Input: default@target@p=2/q=2
+POSTHOOK: Output: default@source
+PREHOOK: query: -- the extra predicates in when matched clause match 1 partition
+merge into target t using source s on t.a = s.a1 when matched and p = 1 and q = 2 then update set b = 1 when matched and p = 2 and q = 2 then delete when not matched and a1 > 100 then insert values(s.a1,s.b1,s.p1, s.q1)
+PREHOOK: type: QUERY
+PREHOOK: Input: default@source
+PREHOOK: Input: default@target
+PREHOOK: Input: default@target@p=1/q=2
+PREHOOK: Input: default@target@p=1/q=3
+PREHOOK: Input: default@target@p=2/q=2
+PREHOOK: Output: default@target
+PREHOOK: Output: default@target@p=1/q=2
+PREHOOK: Output: default@target@p=1/q=2
+PREHOOK: Output: default@target@p=1/q=3
+PREHOOK: Output: default@target@p=1/q=3
+PREHOOK: Output: default@target@p=2/q=2
+PREHOOK: Output: default@target@p=2/q=2
+POSTHOOK: query: -- the extra predicates in when matched clause match 1 partition
+merge into target t using source s on t.a = s.a1 when matched and p = 1 and q = 2 then update set b = 1 when matched and p = 2 and q = 2 then delete when not matched and a1 > 100 then insert values(s.a1,s.b1,s.p1, s.q1)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@source
+POSTHOOK: Input: default@target
+POSTHOOK: Input: default@target@p=1/q=2
+POSTHOOK: Input: default@target@p=1/q=3
+POSTHOOK: Input: default@target@p=2/q=2
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=2
+POSTHOOK: Output: default@target@p=1/q=3
+POSTHOOK: Output: default@target@p=1/q=3
+POSTHOOK: Output: default@target@p=2/q=2
+POSTHOOK: Output: default@target@p=2/q=2