You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by px...@apache.org on 2015/08/16 09:11:33 UTC
[2/2] hive git commit: HIVE-10062: HiveOnTez: Union followed by
Multi-GB followed by Multi-insert loses data (Pengcheng Xiong via Gunther
Hagleitner)
HIVE-10062: HiveOnTez: Union followed by Multi-GB followed by Multi-insert loses data (Pengcheng Xiong via Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d10dee33
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d10dee33
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d10dee33
Branch: refs/heads/branch-1.0
Commit: d10dee33481deac40d9ed6235d2e18c17bceb8ee
Parents: aeeb329
Author: Gunther Hagleitner <gu...@apache.org>
Authored: Thu Apr 23 13:48:31 2015 -0700
Committer: Pengcheng Xiong <px...@apache.org>
Committed: Sun Aug 16 00:10:55 2015 -0700
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 3 +-
.../hadoop/hive/ql/parse/GenTezProcContext.java | 5 +-
.../hadoop/hive/ql/parse/GenTezUtils.java | 5 +-
.../apache/hadoop/hive/ql/parse/GenTezWork.java | 40 +-
.../hadoop/hive/ql/parse/GenTezWorkWalker.java | 6 +
.../clientpositive/tez_union_multiinsert.q | 128 +
.../tez/tez_union_multiinsert.q.out | 4355 ++++++++++++++++++
7 files changed, 4524 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 18289f7..bc7f9ea 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -287,7 +287,8 @@ minitez.query.files=bucket_map_join_tez1.q,\
tez_smb_1.q,\
vectorized_dynamic_partition_pruning.q,\
tez_multi_union.q,\
- tez_join.q
+ tez_join.q,\
+ tez_union_multiinsert.q
beeline.positive.exclude=add_part_exist.q,\
alter1.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
index 90616ad..adc31ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
/**
* GenTezProcContext. GenTezProcContext maintains information
@@ -124,7 +125,8 @@ public class GenTezProcContext implements NodeProcessorCtx{
// used to hook up unions
public final Map<Operator<?>, BaseWork> unionWorkMap;
- public final List<UnionOperator> currentUnionOperators;
+ public final Map<Operator<?>, UnionWork> rootUnionWorkMap;
+ public List<UnionOperator> currentUnionOperators;
public final Set<BaseWork> workWithUnionOperators;
public final Set<ReduceSinkOperator> clonedReduceSinks;
@@ -171,6 +173,7 @@ public class GenTezProcContext implements NodeProcessorCtx{
this.dependencyTask = (DependencyCollectionTask)
TaskFactory.get(new DependencyCollectionWork(), conf);
this.unionWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
+ this.rootUnionWorkMap = new LinkedHashMap<Operator<?>, UnionWork>();
this.currentUnionOperators = new LinkedList<UnionOperator>();
this.workWithUnionOperators = new LinkedHashSet<BaseWork>();
this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
index 627ca7b..bbef50a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
@@ -87,9 +87,10 @@ public class GenTezUtils {
sequenceNumber = 0;
}
- public UnionWork createUnionWork(GenTezProcContext context, Operator<?> operator, TezWork tezWork) {
+ public UnionWork createUnionWork(GenTezProcContext context, Operator<?> root, Operator<?> leaf, TezWork tezWork) {
UnionWork unionWork = new UnionWork("Union "+ (++sequenceNumber));
- context.unionWorkMap.put(operator, unionWork);
+ context.rootUnionWorkMap.put(root, unionWork);
+ context.unionWorkMap.put(leaf, unionWork);
tezWork.add(unionWork);
return unionWork;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 6a87929..7b1b3cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -270,8 +270,8 @@ public class GenTezWork implements NodeProcessor {
}
if (!context.currentUnionOperators.isEmpty()) {
- // if there are union all operators we need to add the work to the set
- // of union operators.
+ // if there are union all operators, it means that the walking context contains union all operators.
+ // please see more details of context.currentUnionOperator in GenTezWorkWalker
UnionWork unionWork;
if (context.unionWorkMap.containsKey(operator)) {
@@ -280,22 +280,25 @@ public class GenTezWork implements NodeProcessor {
// since we've passed this operator before.
assert operator.getChildOperators().isEmpty();
unionWork = (UnionWork) context.unionWorkMap.get(operator);
-
+ // finally connect the union work with work
+ connectUnionWorkWithWork(unionWork, work, tezWork, context);
} else {
- // first time through. we need to create a union work object and add this
- // work to it. Subsequent work should reference the union and not the actual
- // work.
- unionWork = utils.createUnionWork(context, operator, tezWork);
+ // we've not seen this terminal before. we need to check
+ // rootUnionWorkMap which contains the information of mapping the root
+ // operator of a union work to a union work
+ unionWork = context.rootUnionWorkMap.get(root);
+ if (unionWork == null) {
+ // if unionWork is null, it means it is the first time. we need to
+ // create a union work object and add this work to it. Subsequent
+ // work should reference the union and not the actual work.
+ unionWork = utils.createUnionWork(context, root, operator, tezWork);
+ // finally connect the union work with work
+ connectUnionWorkWithWork(unionWork, work, tezWork, context);
+ }
}
-
- // finally hook everything up
- LOG.debug("Connecting union work ("+unionWork+") with work ("+work+")");
- TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS);
- tezWork.connect(unionWork, work, edgeProp);
- unionWork.addUnionOperators(context.currentUnionOperators);
context.currentUnionOperators.clear();
- context.workWithUnionOperators.add(work);
work = unionWork;
+
}
// We're scanning a tree from roots to leaf (this is not technically
@@ -415,4 +418,13 @@ public class GenTezWork implements NodeProcessor {
int pos = stack.indexOf(currentMergeJoinOperator);
return (Operator<? extends OperatorDesc>) stack.get(pos - 1);
}
+
+ private void connectUnionWorkWithWork(UnionWork unionWork, BaseWork work, TezWork tezWork,
+ GenTezProcContext context) {
+ LOG.debug("Connecting union work (" + unionWork + ") with work (" + work + ")");
+ TezEdgeProperty edgeProp = new TezEdgeProperty(EdgeType.CONTAINS);
+ tezWork.connect(unionWork, work, edgeProp);
+ unionWork.addUnionOperators(context.currentUnionOperators);
+ context.workWithUnionOperators.add(work);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
index 08fd61e..cefe24e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWorkWalker.java
@@ -18,11 +18,13 @@
package org.apache.hadoop.hive.ql.parse;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.Node;
@@ -52,6 +54,7 @@ public class GenTezWorkWalker extends DefaultGraphWalker {
ctx.currentRootOperator = (Operator<? extends OperatorDesc>) nd;
ctx.preceedingWork = null;
ctx.parentOfRoot = null;
+ ctx.currentUnionOperators = new ArrayList();
}
/**
@@ -89,6 +92,7 @@ public class GenTezWorkWalker extends DefaultGraphWalker {
// save some positional state
Operator<? extends OperatorDesc> currentRoot = ctx.currentRootOperator;
Operator<? extends OperatorDesc> parentOfRoot = ctx.parentOfRoot;
+ List<UnionOperator> currentUnionOperators = ctx.currentUnionOperators;
BaseWork preceedingWork = ctx.preceedingWork;
if (skip == null || !skip) {
@@ -99,6 +103,8 @@ public class GenTezWorkWalker extends DefaultGraphWalker {
ctx.currentRootOperator = currentRoot;
ctx.parentOfRoot = parentOfRoot;
ctx.preceedingWork = preceedingWork;
+ ctx.currentUnionOperators = new ArrayList();
+ ctx.currentUnionOperators.addAll(currentUnionOperators);
walk(ch);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d10dee33/ql/src/test/queries/clientpositive/tez_union_multiinsert.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/tez_union_multiinsert.q b/ql/src/test/queries/clientpositive/tez_union_multiinsert.q
new file mode 100644
index 0000000..a3500a6
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/tez_union_multiinsert.q
@@ -0,0 +1,128 @@
+-- SORT_QUERY_RESULTS
+
+CREATE TABLE DEST1(key STRING, value STRING) STORED AS TEXTFILE;
+
+CREATE TABLE DEST2(key STRING, val1 STRING, val2 STRING) STORED AS TEXTFILE;
+
+explain
+FROM (
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+ UNION all
+ select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub
+ UNION all
+ select key, value from src s0
+ ) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+FROM (
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+ UNION all
+ select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub
+ UNION all
+ select key, value from src s0
+ ) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1;
+select * from DEST2;
+
+explain
+FROM (
+ select key, value from src s0
+ UNION all
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+ UNION all
+ select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+FROM (
+ select key, value from src s0
+ UNION all
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value, 'tst1' as value2 from src s1
+ UNION all
+ select s2.key as key, s2.value as value, 'tst1' as value2 from src s2) unionsub) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1;
+select * from DEST2;
+
+
+explain
+FROM (
+ select key, value from src s0
+ UNION all
+ select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+FROM (
+ select key, value from src s0
+ UNION all
+ select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1;
+select * from DEST2;
+
+explain
+FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2) unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1;
+select * from DEST2;
+
+explain
+FROM (
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2)subq group by key, value
+ )
+ unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+FROM (
+ select key, value from (
+ select 'tst1' as key, cast(count(1) as string) as value from src s1
+ UNION all
+ select s2.key as key, s2.value as value from src s2)subq group by key, value
+ )
+ unionsrc
+INSERT OVERWRITE TABLE DEST1 SELECT unionsrc.key, COUNT(DISTINCT SUBSTR(unionsrc.value,5)) GROUP BY unionsrc.key
+INSERT OVERWRITE TABLE DEST2 SELECT unionsrc.key, unionsrc.value, COUNT(DISTINCT SUBSTR(unionsrc.value,5))
+GROUP BY unionsrc.key, unionsrc.value;
+
+select * from DEST1;
+select * from DEST2;
\ No newline at end of file