You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2016/05/31 20:57:23 UTC

hive git commit: HIVE-13518 : Hive on Tez: Shuffle joins do not choose the right 'big' table. (Vikram Dixit via Gunther Hagleitner)

Repository: hive
Updated Branches:
  refs/heads/branch-2.1 b7759da73 -> 0e5b90d51


HIVE-13518 : Hive on Tez: Shuffle joins do not choose the right 'big' table. (Vikram Dixit via Gunther Hagleitner)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e5b90d5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e5b90d5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e5b90d5

Branch: refs/heads/branch-2.1
Commit: 0e5b90d515572f9559fc663d542552ca0f39c7bc
Parents: b7759da
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Tue May 31 13:54:59 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue May 31 13:57:07 2016 -0700

----------------------------------------------------------------------
 .../hive/ql/optimizer/ConvertJoinMapJoin.java   | 84 +++++++++++---------
 .../apache/hadoop/hive/ql/parse/GenTezWork.java |  3 +-
 .../clientpositive/tez/metadataonly1.q.out      |  2 +-
 .../clientpositive/tez/vectorized_ptf.q.out     |  2 +-
 4 files changed, 49 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0e5b90d5/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index b35f075..387f47d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -87,6 +87,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
 
     JoinOperator joinOp = (JoinOperator) nd;
+    long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
 
     TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
     if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
@@ -110,7 +111,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       numBuckets = 1;
     }
     LOG.info("Estimated number of buckets " + numBuckets);
-    int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets);
+    int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize);
     if (mapJoinConversionPos < 0) {
       Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx);
       if (retval == null) {
@@ -134,7 +135,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     // check if we can convert to map join no bucket scaling.
     LOG.info("Convert to non-bucketed map join");
     if (numBuckets != 1) {
-      mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1);
+      mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxSize);
     }
     if (mapJoinConversionPos < 0) {
       // we are just converting to a common merge join operator. The shuffle
@@ -359,7 +360,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       // MapRecordProcessor and ReduceRecordProcessor with respect to the sources.
       @SuppressWarnings({"rawtypes","unchecked"})
       Set<ReduceSinkOperator> set =
-          OperatorUtils.findOperatorsUpstream((Collection)parentOp.getParentOperators(),
+          OperatorUtils.findOperatorsUpstream(parentOp.getParentOperators(),
               ReduceSinkOperator.class);
       if (size < 0) {
         size = set.size();
@@ -505,44 +506,42 @@ public class ConvertJoinMapJoin implements NodeProcessor {
   }
 
   public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
-      int buckets) throws SemanticException {
-    /*
-     * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is
-     * an outer join down the join tree that requires filterTag. We disable this conversion to map
-     * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a
-     * new operation to be able to support this. This seems like a corner case enough to special
-     * case this for now.
-     */
-    if (joinOp.getConf().getConds().length > 1) {
-      boolean hasOuter = false;
-      for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) {
-        switch (joinCondDesc.getType()) {
-        case JoinDesc.INNER_JOIN:
-        case JoinDesc.LEFT_SEMI_JOIN:
-        case JoinDesc.UNIQUE_JOIN:
-          hasOuter = false;
-          break;
-
-        case JoinDesc.FULL_OUTER_JOIN:
-        case JoinDesc.LEFT_OUTER_JOIN:
-        case JoinDesc.RIGHT_OUTER_JOIN:
-          hasOuter = true;
-          break;
-
-        default:
-          throw new SemanticException("Unknown join type " + joinCondDesc.getType());
+      int buckets, boolean skipJoinTypeChecks, long maxSize) throws SemanticException {
+    if (!skipJoinTypeChecks) {
+      /*
+       * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is
+       * an outer join down the join tree that requires filterTag. We disable this conversion to map
+       * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a
+       * new operation to be able to support this. This seems like a corner case enough to special
+       * case this for now.
+       */
+      if (joinOp.getConf().getConds().length > 1) {
+        boolean hasOuter = false;
+        for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) {
+          switch (joinCondDesc.getType()) {
+          case JoinDesc.INNER_JOIN:
+          case JoinDesc.LEFT_SEMI_JOIN:
+          case JoinDesc.UNIQUE_JOIN:
+            hasOuter = false;
+            break;
+
+          case JoinDesc.FULL_OUTER_JOIN:
+          case JoinDesc.LEFT_OUTER_JOIN:
+          case JoinDesc.RIGHT_OUTER_JOIN:
+            hasOuter = true;
+            break;
+
+          default:
+            throw new SemanticException("Unknown join type " + joinCondDesc.getType());
+          }
+        }
+        if (hasOuter) {
+          return -1;
         }
-      }
-      if (hasOuter) {
-        return -1;
       }
     }
     Set<Integer> bigTableCandidateSet =
         MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());
-
-    long maxSize = context.conf.getLongVar(
-        HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
-
     int bigTablePosition = -1;
     // big input cumulative row count
     long bigInputCumulativeCardinality = -1L;
@@ -576,7 +575,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
           // on size and there's another one that's bigger.
           return -1;
         }
-        
+
         if (inputSize/buckets > maxSize) {
           if (!bigTableCandidateSet.contains(pos)) {
             // can't use the current table as the big table, but it's too
@@ -826,7 +825,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
     // Since we don't have big table index yet, must start with estimate of numReducers
     int numReducers = estimateNumBuckets(joinOp, false);
     LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers");
-    int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers);
+    int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false,
+                          context.conf.getLongVar(
+                              HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD));
     if (bigTablePos >= 0) {
       // Now that we have the big table index, get real numReducers value based on big table RS
       ReduceSinkOperator bigTableParentRS =
@@ -869,9 +870,14 @@ public class ConvertJoinMapJoin implements NodeProcessor {
       }
     }
 
+    int pos = getMapJoinConversionPos(joinOp, context, estimateNumBuckets(joinOp, false),
+                  true, Long.MAX_VALUE);
+    if (pos < 0) {
+      LOG.info("Could not get a valid join position. Defaulting to position 0");
+      pos = 0;
+    }
     // we are just converting to a common merge join operator. The shuffle
     // join in map-reduce case.
-    int pos = 0; // it doesn't matter which position we use in this case.
     LOG.info("Fallback to common merge join operator");
     convertJoinSMBJoin(joinOp, context, pos, 0, false);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0e5b90d5/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 46d279e..461ba37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -168,7 +168,8 @@ public class GenTezWork implements NodeProcessor {
           getParentFromStack(context.currentMergeJoinOperator, stack);
       // Set the big table position. Both the reduce work and merge join operator
       // should be set with the same value.
-      int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp);
+//      int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp);
+      int pos = context.currentMergeJoinOperator.getConf().getBigTablePosition();
       work.setTag(pos);
       context.currentMergeJoinOperator.getConf().setBigTablePosition(pos);
       tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);

http://git-wip-us.apache.org/repos/asf/hive/blob/0e5b90d5/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/metadataonly1.q.out b/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
index 15f5ed5..4075b81 100644
--- a/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
+++ b/ql/src/test/results/clientpositive/tez/metadataonly1.q.out
@@ -772,7 +772,7 @@ STAGE PLANS:
                 keys:
                   0 _col0 (type: string)
                   1 _col0 (type: string)
-                Position of Big Table: 0
+                Position of Big Table: 1
                 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                 Group By Operator
                   aggregations: count()

http://git-wip-us.apache.org/repos/asf/hive/blob/0e5b90d5/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out b/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out
index 0a62262..0435d28 100644
--- a/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out
+++ b/ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out
@@ -2117,7 +2117,7 @@ STAGE PLANS:
                   0 p_partkey (type: int)
                   1 _col0 (type: int)
                 outputColumnNames: _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20
-                Position of Big Table: 0
+                Position of Big Table: 1
                 Statistics: Num rows: 28 Data size: 17646 Basic stats: COMPLETE Column stats: NONE
                 Select Operator
                   expressions: _col12 (type: int), _col13 (type: string), _col14 (type: string), _col15 (type: string), _col16 (type: string), _col17 (type: int), _col18 (type: string), _col19 (type: double), _col20 (type: string)