You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/09/16 21:18:27 UTC
[09/50] [abbrv] hive git commit: HIVE-11605: Incorrect results with
bucket map join in tez. (Vikram Dixit K, reviewed by Sergey Shelukhin)
HIVE-11605: Incorrect results with bucket map join in tez. (Vikram Dixit K, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4ea8e296
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4ea8e296
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4ea8e296
Branch: refs/heads/spark
Commit: 4ea8e29619eb0bbb02e3f7c09ffc9d44bf4cdfef
Parents: 594e25a
Author: vikram <vi...@hortonworks.com>
Authored: Thu Sep 10 13:13:56 2015 -0700
Committer: vikram <vi...@hortonworks.com>
Committed: Thu Sep 10 13:30:23 2015 -0700
----------------------------------------------------------------------
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 18 ++-
.../ql/optimizer/ReduceSinkMapJoinProc.java | 8 +-
.../clientpositive/bucket_map_join_tez1.q | 9 ++
.../spark/bucket_map_join_tez1.q.out | 131 +++++++++++++++++++
.../tez/bucket_map_join_tez1.q.out | 123 +++++++++++++++++
5 files changed, 280 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index e3acdfc..8ea1879 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -375,13 +375,13 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp;
if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp
- .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) {
+ .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx, false) == false) {
LOG.info("We cannot convert to SMB because the sort column names do not match.");
return false;
}
if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp
- .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx)
+ .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx, true)
== false) {
LOG.info("We cannot convert to SMB because bucket column names do not match.");
return false;
@@ -428,7 +428,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
// all keys matched.
if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(),
- tezBucketJoinProcCtx) == false) {
+ tezBucketJoinProcCtx, true) == false) {
LOG.info("No info available to check for bucket map join. Cannot convert");
return false;
}
@@ -446,7 +446,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
private boolean checkColEquality(List<List<String>> grandParentColNames,
List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap,
- TezBucketJoinProcCtx tezBucketJoinProcCtx) {
+ TezBucketJoinProcCtx tezBucketJoinProcCtx, boolean strict) {
if ((grandParentColNames == null) || (parentColNames == null)) {
return false;
@@ -479,7 +479,15 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
if (colCount == parentColNames.get(0).size()) {
- return true;
+ if (strict) {
+ if (colCount == listBucketCols.size()) {
+ return true;
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
index b546838..71c766f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
@@ -226,10 +226,6 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
int numBuckets = -1;
EdgeType edgeType = EdgeType.BROADCAST_EDGE;
if (joinConf.isBucketMapJoin()) {
-
- // disable auto parallelism for bucket map joins
- parentRS.getConf().setReducerTraits(EnumSet.of(FIXED));
-
numBuckets = (Integer) joinConf.getBigTableBucketNumMapping().values().toArray()[0];
/*
* Here, we can be in one of 4 states.
@@ -273,6 +269,10 @@ public class ReduceSinkMapJoinProc implements NodeProcessor {
} else if (mapJoinOp.getConf().isDynamicPartitionHashJoin()) {
edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
}
+ if (edgeType == EdgeType.CUSTOM_EDGE) {
+ // disable auto parallelism for bucket map joins
+ parentRS.getConf().setReducerTraits(EnumSet.of(FIXED));
+ }
TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets);
if (mapJoinWork != null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
index 4a7d63e..0f9dd6d 100644
--- a/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
+++ b/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
@@ -30,6 +30,15 @@ explain
select a.key, a.value, b.value
from tab a join tab_part b on a.key = b.key;
+explain
+select count(*)
+from
+(select distinct key, value from tab_part) a join tab b on a.key = b.key;
+
+select count(*)
+from
+(select distinct key, value from tab_part) a join tab b on a.key = b.key;
+
-- one side is really bucketed. srcbucket_mapjoin is not really a bucketed table.
-- In this case the sub-query is chosen as the big table.
explain
http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out b/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
index 65bded2..34ddc90 100644
--- a/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
+++ b/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
@@ -183,6 +183,137 @@ STAGE PLANS:
Processor Tree:
ListSink
+PREHOOK: query: explain
+select count(*)
+from
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*)
+from
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-2 is a root stage
+ Stage-1 depends on stages: Stage-2
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-2
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: b
+ Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Spark HashTable Sink Operator
+ keys:
+ 0 _col0 (type: int)
+ 1 key (type: int)
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (GROUP, 2)
+ Reducer 3 <- Reducer 2 (GROUP, 1)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: tab_part
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ keys: key (type: int), value (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: int), _col1 (type: string)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reducer 2
+ Local Work:
+ Map Reduce Local Work
+ Reduce Operator Tree:
+ Group By Operator
+ keys: KEY._col0 (type: int), KEY._col1 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: int)
+ 1 key (type: int)
+ input vertices:
+ 1 Map 4
+ Statistics: Num rows: 137 Data size: 1460 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*)
+from
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab
+PREHOOK: Input: default@tab@ds=2008-04-08
+PREHOOK: Input: default@tab_part
+PREHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*)
+from
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab
+POSTHOOK: Input: default@tab@ds=2008-04-08
+POSTHOOK: Input: default@tab_part
+POSTHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+242
PREHOOK: query: -- one side is really bucketed. srcbucket_mapjoin is not really a bucketed table.
-- In this case the sub-query is chosen as the big table.
explain
http://git-wip-us.apache.org/repos/asf/hive/blob/4ea8e296/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out b/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out
index 61c197f..8338672 100644
--- a/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out
+++ b/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out
@@ -178,6 +178,129 @@ STAGE PLANS:
Processor Tree:
ListSink
+PREHOOK: query: explain
+select count(*)
+from
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select count(*)
+from
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+ Edges:
+ Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (BROADCAST_EDGE)
+ Reducer 3 <- Reducer 2 (SIMPLE_EDGE)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: tab_part
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Group By Operator
+ keys: key (type: int), value (type: string)
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: string)
+ sort order: ++
+ Map-reduce partition columns: _col0 (type: int), _col1 (type: string)
+ Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+ Map 4
+ Map Operator Tree:
+ TableScan
+ alias: b
+ Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: key is not null (type: boolean)
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: key (type: int)
+ sort order: +
+ Map-reduce partition columns: key (type: int)
+ Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+ Reducer 2
+ Reduce Operator Tree:
+ Group By Operator
+ keys: KEY._col0 (type: int), KEY._col1 (type: string)
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: int)
+ outputColumnNames: _col0
+ Statistics: Num rows: 125 Data size: 1328 Basic stats: COMPLETE Column stats: NONE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ keys:
+ 0 _col0 (type: int)
+ 1 key (type: int)
+ input vertices:
+ 1 Map 4
+ Statistics: Num rows: 137 Data size: 1460 Basic stats: COMPLETE Column stats: NONE
+ HybridGraceHashJoin: true
+ Group By Operator
+ aggregations: count()
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ sort order:
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ value expressions: _col0 (type: bigint)
+ Reducer 3
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations: count(VALUE._col0)
+ mode: mergepartial
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select count(*)
+from
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tab
+PREHOOK: Input: default@tab@ds=2008-04-08
+PREHOOK: Input: default@tab_part
+PREHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+POSTHOOK: query: select count(*)
+from
+(select distinct key, value from tab_part) a join tab b on a.key = b.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tab
+POSTHOOK: Input: default@tab@ds=2008-04-08
+POSTHOOK: Input: default@tab_part
+POSTHOOK: Input: default@tab_part@ds=2008-04-08
+#### A masked pattern was here ####
+242
PREHOOK: query: -- one side is really bucketed. srcbucket_mapjoin is not really a bucketed table.
-- In this case the sub-query is chosen as the big table.
explain