You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2017/04/12 20:41:28 UTC

hive git commit: HIVE-16328: HoS: more aggressive mapjoin optimization when hive.spark.use.ts.stats.for.mapjoin is true (Chao Sun, reviewed by Xuefu Zhang)

Repository: hive
Updated Branches:
  refs/heads/master 15e2586da -> 7d3338791


HIVE-16328: HoS: more aggressive mapjoin optimization when hive.spark.use.ts.stats.for.mapjoin is true (Chao Sun, reviewed by Xuefu Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7d333879
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7d333879
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7d333879

Branch: refs/heads/master
Commit: 7d33387916efa2bf410b1593207751dab7064b7a
Parents: 15e2586
Author: Chao Sun <su...@apache.org>
Authored: Wed Apr 12 13:40:24 2017 -0700
Committer: Chao Sun <su...@apache.org>
Committed: Wed Apr 12 13:40:24 2017 -0700

----------------------------------------------------------------------
 .../optimizer/spark/SparkMapJoinOptimizer.java  | 69 +++++++++++++++-----
 1 file changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7d333879/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
index 207f7b3..9243873 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
+import com.google.common.collect.Sets;
+import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.slf4j.Logger;
@@ -80,7 +82,7 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
       return null;
     }
 
-    LOG.info("Check if it can be converted to map join");
+    LOG.info("Check if operator " + joinOp + " can be converted to map join");
     long[] mapJoinInfo = getMapJoinConversionInfo(joinOp, context);
     int mapJoinConversionPos = (int) mapJoinInfo[0];
 
@@ -196,25 +198,40 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
     // max. This table is either the big table or we cannot convert.
     boolean bigTableFound = false;
     boolean useTsStats = context.getConf().getBoolean(HiveConf.ConfVars.SPARK_USE_TS_STATS_FOR_MAPJOIN.varname, false);
-    boolean hasUpstreamSinks = false;
 
-    // Check whether there's any upstream RS.
-    // If so, don't use TS stats because they could be inaccurate.
-    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
-      Set<ReduceSinkOperator> parentSinks =
-          OperatorUtils.findOperatorsUpstream(parentOp, ReduceSinkOperator.class);
-      parentSinks.remove(parentOp);
-      if (!parentSinks.isEmpty()) {
-        hasUpstreamSinks = true;
+    // If we're using TS's stats for mapjoin optimization, check each branch and see if there's any
+    // upstream operator (e.g., JOIN, LATERAL_VIEW) that can increase output data size.
+    // If so, mark that branch as the big table branch.
+    if (useTsStats) {
+      LOG.debug("Checking map join optimization for operator {} using TS stats", joinOp);
+      for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+        if (isBigTableBranch(parentOp)) {
+          if (bigTablePosition < 0 && bigTableCandidateSet.contains(pos)) {
+            LOG.debug("Found a big table branch with parent operator {} and position {}", parentOp, pos);
+            bigTablePosition = pos;
+            bigTableFound = true;
+            bigInputStat = new Statistics();
+            bigInputStat.setDataSize(Long.MAX_VALUE);
+          } else {
+            // Either we've found multiple big table branches, or the current branch cannot
+            // be a big table branch. Disable mapjoin for these cases.
+            LOG.debug("Cannot enable map join optimization for operator {}", joinOp);
+            return new long[]{-1, 0, 0};
+          }
+        }
+        pos++;
       }
     }
 
-    // If we are using TS stats and this JOIN has at least one upstream RS, disable MapJoin conversion.
-    if (useTsStats && hasUpstreamSinks) {
-      return new long[]{-1, 0, 0};
-    }
+    pos = 0;
 
     for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+      // Skip the potential big table identified above
+      if (pos == bigTablePosition) {
+        pos++;
+        continue;
+      }
+
       Statistics currInputStat;
       if (useTsStats) {
         currInputStat = new Statistics();
@@ -255,9 +272,8 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
       }
 
       long inputSize = currInputStat.getDataSize();
-      if ((bigInputStat == null)
-          || ((bigInputStat != null)
-          && (inputSize > bigInputStat.getDataSize()))) {
+
+      if (bigInputStat == null || inputSize > bigInputStat.getDataSize()) {
 
         if (bigTableFound) {
           // cannot convert to map join; we've already chosen a big table
@@ -318,6 +334,25 @@ public class SparkMapJoinOptimizer implements NodeProcessor {
   }
 
   /**
+   * Check whether the branch starting from 'op' is a potential big table branch.
+   * This is true if the branch contains any operator that could potentially increase
+   * output data size, such as JOIN and LATERAL_VIEW. If this is the case, we assume
+   * the worst and mark the branch as big table branch in the MapJoin optimization.
+   *
+   * @return True if the branch starting at 'op' is a big table branch. False otherwise.
+   */
+  private boolean isBigTableBranch(Operator<? extends OperatorDesc> op) {
+    for (Class<? extends Operator<? extends OperatorDesc>> clazz :
+        Sets.newHashSet(JoinOperator.class, LateralViewForwardOperator.class)) {
+      Set<? extends Operator<? extends OperatorDesc>> parentSinks = OperatorUtils.findOperatorsUpstream(op, clazz);
+      if (!parentSinks.isEmpty()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Examines this operator and all the connected operators, for mapjoins that will be in the same work.
    * @param parentOp potential big-table parent operator, explore up from this.
    * @param joinOp potential mapjoin operator, explore down from this.