You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by px...@apache.org on 2015/08/16 09:11:33 UTC

[2/2] hive git commit: HIVE-10062: HiveOnTez: Union followed by Multi-GB followed by Multi-insert loses data (Pengcheng Xiong via Gunther Hagleitner)

HIVE-10062: HiveOnTez: Union followed by Multi-GB followed by Multi-insert loses data (Pengcheng Xiong via Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d10dee33
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d10dee33
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d10dee33

Branch: refs/heads/branch-1.0
Commit: d10dee33481deac40d9ed6235d2e18c17bceb8ee
Parents: aeeb329
Author: Gunther Hagleitner <gu...@apache.org>
Authored: Thu Apr 23 13:48:31 2015 -0700
Committer: Pengcheng Xiong <px...@apache.org>
Committed: Sun Aug 16 00:10:55 2015 -0700

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |    3 +-
 .../hadoop/hive/ql/parse/GenTezProcContext.java |    5 +-
 .../hadoop/hive/ql/parse/GenTezUtils.java       |    5 +-
 .../apache/hadoop/hive/ql/parse/GenTezWork.java |   40 +-
 .../hadoop/hive/ql/parse/GenTezWorkWalker.java  |    6 +
 .../clientpositive/tez_union_multiinsert.q      |  128 +
 .../tez/tez_union_multiinsert.q.out             | 4355 ++++++++++++++++++
 7 files changed, 4524 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 18289f7..bc7f9ea 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -287,7 +287,8 @@ minitez.query.files=bucket_map_join_tez1.q,\
   tez_smb_1.q,\
   vectorized_dynamic_partition_pruning.q,\
   tez_multi_union.q,\
-  tez_join.q
+  tez_join.q,\
+  tez_union_multiinsert.q
 
 beeline.positive.exclude=add_part_exist.q,\
   alter1.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
index 90616ad..adc31ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
 
 /**
  * GenTezProcContext. GenTezProcContext maintains information
@@ -124,7 +125,8 @@ public class GenTezProcContext implements NodeProcessorCtx{
 
   // used to hook up unions
   public final Map<Operator<?>, BaseWork> unionWorkMap;
-  public final List<UnionOperator> currentUnionOperators;
+  public final Map<Operator<?>, UnionWork> rootUnionWorkMap;
+  public List<UnionOperator> currentUnionOperators;
   public final Set<BaseWork> workWithUnionOperators;
   public final Set<ReduceSinkOperator> clonedReduceSinks;
 
@@ -171,6 +173,7 @@ public class GenTezProcContext implements NodeProcessorCtx{
     this.dependencyTask = (DependencyCollectionTask)
         TaskFactory.get(new DependencyCollectionWork(), conf);
     this.unionWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
+    this.rootUnionWorkMap = new LinkedHashMap<Operator<?>, UnionWork>();
     this.currentUnionOperators = new LinkedList<UnionOperator>();
     this.workWithUnionOperators = new LinkedHashSet<BaseWork>();
     this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();

http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 627ca7b..bbef50a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -87,9 +87,10 @@ public class GenTezUtils {
     sequenceNumber = 0;
   }
 
-  public UnionWork createUnionWork(GenTezProcContext context, Operator<?> operator, TezWork tezWork) {
+  public UnionWork createUnionWork(GenTezProcContext context, Operator<?> root, Operator<?> leaf, TezWork tezWork) {
     UnionWork unionWork = new UnionWork("Union "+ (++sequenceNumber));
-    context.unionWorkMap.put(operator, unionWork);
+    context.rootUnionWorkMap.put(root, unionWork);
+    context.unionWorkMap.put(leaf, unionWork);
     tezWork.add(unionWork);
     return unionWork;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 6a87929..7b1b3cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -270,8 +270,8 @@ public class GenTezWork implements NodeProcessor {
     }
 
     if (!context.currentUnionOperators.isEmpty()) {
-      // if there are union all operators we need to add the work to the set
-      // of union operators.
+      // if there are union all operators, it means that the walking context contains union all operators.
+      // please see more details of context.currentUnionOperator in GenTezWorkWalker
 
       UnionWork unionWork;
       if (context.unionWorkMap.containsKey(operator)) {
@@ -280,22 +280,25 @@ public class GenTezWork implements NodeProcessor {
         // since we've passed this operator before.
         assert operator.getChildOperators().isEmpty();
         unionWork = (UnionWork) context.unionWorkMap.get(operator);
-
+        // finally connect the union work with work
+        connectUnionWorkWithWork(unionWork, work, tezWork, context);
       } else {
-        // first time through. we need to create a union work object and add this
-        // work to it. Subsequent work should reference the union and not the actual
-        // work.
-        unionWork = utils.createUnionWork(context, operator, tezWork);
+        // we've not seen this terminal before. we need to check
+        // rootUnionWorkMap which contains the information of mapping the root
+        // operator of a union work to a union work
+        unionWork = context.rootUnionWorkMap.get(root);
+        if (unionWork == null) {
+          // if unionWork is null, it means it is the first time. we need to
+          // create a union work object and add this work to it. Subsequent 
+          // work should reference the union and not the actual work.
+          unionWork = utils.createUnionWork(context, root, operator, tezWork);
+          // finally connect the union work with work
+          connectUnionWorkWithWork(unionWork, work, tezWork, context);
+        }
       }
-
-      // finally hook everything up
-      LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")");
-      TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS);
-      tezWork.connect(unionWork, work, edgeProp);
-      unionWork.addUnionOperators(context.currentUnionOperators);
       context.currentUnionOperators.clear();
-      context.workWithUnionOperators.add(work);
       work = unionWork;
+
     }
 
     // We're scanning a tree from roots to leaf (this is not technically
@@ -415,4 +418,13 @@ public class GenTezWork implements NodeProcessor {
     int pos = stack.indexOf(currentMergeJoinOperator);
     return (Operator<? extends OperatorDesc>) stack.get(pos - 1);
   }
+  
+  private void connectUnionWorkWithWork(UnionWork unionWork, BaseWork work, TezWork tezWork,
+      GenTezProcContext context) {
+    LOG.debug("Connecting union work (" + unionWork + ") with work (" + work + ")");
+    TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS);
+    tezWork.connect(unionWork, work, edgeProp);
+    unionWork.addUnionOperators(context.currentUnionOperators);
+    context.workWithUnionOperators.add(work);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
index 08fd61e..cefe24e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
@@ -18,11 +18,13 @@
 
 package org.apache.hadoop.hive.ql.parse;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.Node;
@@ -52,6 +54,7 @@ public class GenTezWorkWalker extends DefaultGraphWalker {
     ctx.currentRootOperator = (Operator<? extends OperatorDesc>) nd;
     ctx.preceedingWork = null;
     ctx.parentOfRoot = null;
+    ctx.currentUnionOperators = new ArrayList();
   }
 
   /**
@@ -89,6 +92,7 @@ public class GenTezWorkWalker extends DefaultGraphWalker {
     // save some positional state
     Operator<? extends OperatorDesc> currentRoot = ctx.currentRootOperator;
     Operator<? extends OperatorDesc> parentOfRoot = ctx.parentOfRoot;
+    List<UnionOperator> currentUnionOperators = ctx.currentUnionOperators;
     BaseWork preceedingWork = ctx.preceedingWork;
 
     if (skip == null || !skip) {
@@ -99,6 +103,8 @@ public class GenTezWorkWalker extends DefaultGraphWalker {
         ctx.currentRootOperator = currentRoot;
         ctx.parentOfRoot = parentOfRoot;
         ctx.preceedingWork = preceedingWork;
+        ctx.currentUnionOperators = new ArrayList();
+        ctx.currentUnionOperators.addAll(currentUnionOperators);
 
         walk(ch);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/ql/src/test/queries/clientpositive/tez_union_multiinsert.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_union_multiinsert.q b/ql/src/test/queries/clientpositive/tez_union_multiinsert.q
new file mode 100644
index 0000000..a3500a6
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_union_multiinsert.q
@@ -0,0 +1,128 @@
+-- SORT_QUERY_RESULTS
+
+CREATE TABLE DEST1(key STRING, value STRING) STORED AS TEXTFILE;
+
+CREATE TABLE DEST2(key STRING, val1 STRING, val2 STRING) STORED AS TEXTFILE;
+
+explain
+FROM (
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub
+                         UNION all
+      select key, value from src s0
+                             ) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+FROM (
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub
+                         UNION all
+      select key, value from src s0
+                             ) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1;
+select * from DEST2;
+
+explain
+FROM (
+      select key, value from src s0
+                         UNION all
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+FROM (
+      select key, value from src s0
+                         UNION all
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1;
+select * from DEST2;
+
+
+explain
+FROM (
+      select key, value from src s0
+                         UNION all
+      select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+FROM (
+      select key, value from src s0
+                         UNION all
+      select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1;
+select * from DEST2;
+
+explain 
+FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1;
+select * from DEST2;
+
+explain
+FROM (
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2)subq group by key, value
+      )
+      unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+FROM (
+      select key, value from (
+      select 'tst1' as key, cast(count(1) as string) as value from src s1
+                         UNION all 
+      select s2.key as key, s2.value as value from src s2)subq group by key, value
+      )
+      unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) 
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1;
+select * from DEST2;
\ No newline at end of file