You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2016/05/31 20:57:23 UTC
hive git commit: HIVE-13518 : Hive on Tez: Shuffle joins do not
choose the right 'big' table. (Vikram Dixit via Gunther Hagleitner)
Repository: hive
Updated Branches:
refs/heads/branch-2.1 b7759da73 -> 0e5b90d51
HIVE-13518 : Hive on Tez: Shuffle joins do not choose the right 'big' table. (Vikram Dixit via Gunther Hagleitner)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e5b90d5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e5b90d5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e5b90d5
Branch: refs/heads/branch-2.1
Commit: 0e5b90d515572f9559fc663d542552ca0f39c7bc
Parents: b7759da
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Tue May 31 13:54:59 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue May 31 13:57:07 2016 -0700
----------------------------------------------------------------------
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 84 +++++++++++---------
.../apache/hadoop/hive/ql/parse/GenTezWork.java | 3 +-
.../clientpositive/tez/metadataonly1.q.out | 2 +-
.../clientpositive/tez/vectorized_ptf.q.out | 2 +-
4 files changed, 49 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0e5b90d5/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index b35f075..387f47d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -87,6 +87,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
JoinOperator joinOp = (JoinOperator) nd;
+ long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
@@ -110,7 +111,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
numBuckets = 1;
}
LOG.info("Estimated number of buckets " + numBuckets);
- int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets);
+ int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize);
if (mapJoinConversionPos < 0) {
Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
if (retval == null) {
@@ -134,7 +135,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// check if we can convert to map join no bucket scaling.
LOG.info("Convert to non-bucketed map join");
if (numBuckets != 1) {
- mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1);
+ mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxSize);
}
if (mapJoinConversionPos < 0) {
// we are just converting to a common merge join operator. The shuffle
@@ -359,7 +360,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// MapRecordProcessor and ReduceRecordProcessor with respect to the sources.
@SuppressWarnings({"rawtypes","unchecked"})
Set<ReduceSinkOperator> set =
- OperatorUtils.findOperatorsUpstream((Collection)parentOp.getParentOperators(),
+ OperatorUtils.findOperatorsUpstream(parentOp.getParentOperators(),
ReduceSinkOperator.class);
if (size < 0) {
size = set.size();
@@ -505,44 +506,42 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
- int buckets) throws SemanticException {
- /*
- * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is
- * an outer join down the join tree that requires filterTag. We disable this conversion to map
- * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a
- * new operation to be able to support this. This seems like a corner case enough to special
- * case this for now.
- */
- if (joinOp.getConf().getConds().length > 1) {
- boolean hasOuter = false;
- for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) {
- switch (joinCondDesc.getType()) {
- case JoinDesc.INNER_JOIN:
- case JoinDesc.LEFT_SEMI_JOIN:
- case JoinDesc.UNIQUE_JOIN:
- hasOuter = false;
- break;
-
- case JoinDesc.FULL_OUTER_JOIN:
- case JoinDesc.LEFT_OUTER_JOIN:
- case JoinDesc.RIGHT_OUTER_JOIN:
- hasOuter = true;
- break;
-
- default:
- throw new SemanticException("Unknown join type " + joinCondDesc.getType());
+ int buckets, boolean skipJoinTypeChecks, long maxSize) throws SemanticException {
+ if (!skipJoinTypeChecks) {
+ /*
+ * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is
+ * an outer join down the join tree that requires filterTag. We disable this conversion to map
+ * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a
+ * new operation to be able to support this. This seems like a corner case enough to special
+ * case this for now.
+ */
+ if (joinOp.getConf().getConds().length > 1) {
+ boolean hasOuter = false;
+ for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) {
+ switch (joinCondDesc.getType()) {
+ case JoinDesc.INNER_JOIN:
+ case JoinDesc.LEFT_SEMI_JOIN:
+ case JoinDesc.UNIQUE_JOIN:
+ hasOuter = false;
+ break;
+
+ case JoinDesc.FULL_OUTER_JOIN:
+ case JoinDesc.LEFT_OUTER_JOIN:
+ case JoinDesc.RIGHT_OUTER_JOIN:
+ hasOuter = true;
+ break;
+
+ default:
+ throw new SemanticException("Unknown join type " + joinCondDesc.getType());
+ }
+ }
+ if (hasOuter) {
+ return -1;
}
- }
- if (hasOuter) {
- return -1;
}
}
Set<Integer> bigTableCandidateSet =
MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());
-
- long maxSize = context.conf.getLongVar(
- HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
-
int bigTablePosition = -1;
// big input cumulative row count
long bigInputCumulativeCardinality = -1L;
@@ -576,7 +575,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// on size and there's another one that's bigger.
return -1;
}
-
+
if (inputSize/buckets > maxSize) {
if (!bigTableCandidateSet.contains(pos)) {
// can't use the current table as the big table, but it's too
@@ -826,7 +825,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
// Since we don't have big table index yet, must start with estimate of numReducers
int numReducers = estimateNumBuckets(joinOp, false);
LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers");
- int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers);
+ int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false,
+ context.conf.getLongVar(
+ HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD));
if (bigTablePos >= 0) {
// Now that we have the big table index, get real numReducers value based on big table RS
ReduceSinkOperator bigTableParentRS =
@@ -869,9 +870,14 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
}
+ int pos = getMapJoinConversionPos(joinOp, context, estimateNumBuckets(joinOp, false),
+ true, Long.MAX_VALUE);
+ if (pos < 0) {
+ LOG.info("Could not get a valid join position. Defaulting to position 0");
+ pos = 0;
+ }
// we are just converting to a common merge join operator. The shuffle
// join in map-reduce case.
- int pos = 0; // it doesn't matter which position we use in this case.
LOG.info("Fallback to common merge join operator");
convertJoinSMBJoin(joinOp, context, pos, 0, false);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0e5b90d5/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 46d279e..461ba37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -168,7 +168,8 @@ public class GenTezWork implements NodeProcessor {
getParentFromStack(context.currentMergeJoinOperator, stack);
// Set the big table position. Both the reduce work and merge join operator
// should be set with the same value.
- int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp);
+// int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp);
+ int pos = context.currentMergeJoinOperator.getConf().getBigTablePosition();
work.setTag(pos);
context.currentMergeJoinOperator.getConf().setBigTablePosition(pos);
tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);
http://git-wip-us.apache.org/repos/asf/hive/blob/0e5b90d5/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/metadataonly1.q.out b/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
index 15f5ed5..4075b81 100644
--- a/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
+++ b/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
@@ -772,7 +772,7 @@ STAGE PLANS:
keys:
0 _col0 (type: string)
1 _col0 (type: string)
- Position of Big Table: 0
+ Position of Big Table: 1
Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
Group By Operator
aggregations: count()
http://git-wip-us.apache.org/repos/asf/hive/blob/0e5b90d5/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out b/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out
index 0a62262..0435d28 100644
--- a/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out
+++ b/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out
@@ -2117,7 +2117,7 @@ STAGE PLANS:
0 p_partkey (type: int)
1 _col0 (type: int)
outputColumnNames: _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20
- Position of Big Table: 0
+ Position of Big Table: 1
Statistics: Num rows: 28 Data size: 17646 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col12 (type: int), _col13 (type: string), _col14 (type: string), _col15 (type: string), _col16 (type: string), _col17 (type: int), _col18 (type: string), _col19 (type: double), _col20 (type: string)