You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2016/12/19 03:21:32 UTC

hive git commit: HIVE-15428: HoS DPP doesn't remove cyclic dependency (Rui Li reviewed by Chao Sun)

Repository: hive
Updated Branches:
  refs/heads/master 65940caca -> c6a7edd5a


HIVE-15428: HoS DPP doesn't remove cyclic dependency (Rui Li reviewed by Chao Sun)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c6a7edd5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c6a7edd5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c6a7edd5

Branch: refs/heads/master
Commit: c6a7edd5a299cfca2e35ddb11edabc2ad126f2ad
Parents: 65940ca
Author: Rui Li <li...@apache.org>
Authored: Mon Dec 19 11:21:23 2016 +0800
Committer: Rui Li <li...@apache.org>
Committed: Mon Dec 19 11:21:23 2016 +0800

----------------------------------------------------------------------
 .../SparkRemoveDynamicPruningBySize.java        |  12 +-
 .../hive/ql/parse/spark/GenSparkUtils.java      |  19 ++++
 .../hive/ql/parse/spark/SparkCompiler.java      | 112 +++++++++++++++++++
 3 files changed, 133 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c6a7edd5/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
index a6bf3af..c41a0c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
@@ -23,12 +23,12 @@ import java.util.Stack;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils;
 import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
 import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
 
@@ -54,15 +54,7 @@ public class SparkRemoveDynamicPruningBySize implements NodeProcessor {
 
     if (desc.getStatistics().getDataSize() > context.getConf()
         .getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) {
-      Operator<?> child = op;
-      Operator<?> curr = op;
-
-      while (curr.getChildOperators().size() <= 1) {
-        child = curr;
-        curr = curr.getParentOperators().get(0);
-      }
-
-      curr.removeChild(child);
+      GenSparkUtils.removeBranch(op);
       // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
       LOG.info("Disabling dynamic pruning for: "
           + desc.getTableScan().getName()

http://git-wip-us.apache.org/repos/asf/hive/blob/c6a7edd5/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 8a85574..7b2b3c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -596,4 +596,23 @@ public class GenSparkUtils {
       findRoots(p, ops);
     }
   }
+
+  /**
+   * Remove the branch that contains the specified operator. Do nothing if there's no branching,
+   * i.e. all the upstream operators have only one child.
+   */
+  public static void removeBranch(Operator<?> op) {
+    Operator<?> child = op;
+    Operator<?> curr = op;
+
+    while (curr.getChildOperators().size() <= 1) {
+      child = curr;
+      if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) {
+        return;
+      }
+      curr = curr.getParentOperators().get(0);
+    }
+
+    curr.removeChild(child);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c6a7edd5/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index baf77c7..71528e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hive.ql.parse.spark;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
@@ -73,6 +75,7 @@ import org.apache.hadoop.hive.ql.optimizer.spark.CombineEquivalentWorkResolver;
 import org.apache.hadoop.hive.ql.optimizer.spark.SetSparkReducerParallelism;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinHintOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinOptimizer;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkSkewJoinResolver;
 import org.apache.hadoop.hive.ql.optimizer.spark.SplitSparkWorkResolver;
@@ -116,9 +119,118 @@ public class SparkCompiler extends TaskCompiler {
     // Run Join releated optimizations
     runJoinOptimizations(procCtx);
 
+    // Remove cyclic dependencies for DPP
+    runCycleAnalysisForPartitionPruning(procCtx);
+
     PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
   }
 
+  private void runCycleAnalysisForPartitionPruning(OptimizeSparkProcContext procCtx) {
+    if (!conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+      return;
+    }
+
+    boolean cycleFree = false;
+    while (!cycleFree) {
+      cycleFree = true;
+      Set<Set<Operator<?>>> components = getComponents(procCtx);
+      for (Set<Operator<?>> component : components) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Component: ");
+          for (Operator<?> co : component) {
+            LOG.debug("Operator: " + co.getName() + ", " + co.getIdentifier());
+          }
+        }
+        if (component.size() != 1) {
+          LOG.info("Found cycle in operator plan...");
+          cycleFree = false;
+          removeDPPOperator(component, procCtx);
+          break;
+        }
+      }
+      LOG.info("Cycle free: " + cycleFree);
+    }
+  }
+
+  private void removeDPPOperator(Set<Operator<?>> component, OptimizeSparkProcContext context) {
+    SparkPartitionPruningSinkOperator toRemove = null;
+    for (Operator<?> o : component) {
+      if (o instanceof SparkPartitionPruningSinkOperator) {
+        // we want to remove the DPP with bigger data size
+        if (toRemove == null
+            || o.getConf().getStatistics().getDataSize() > toRemove.getConf().getStatistics()
+            .getDataSize()) {
+          toRemove = (SparkPartitionPruningSinkOperator) o;
+        }
+      }
+    }
+
+    if (toRemove == null) {
+      return;
+    }
+
+    GenSparkUtils.removeBranch(toRemove);
+    // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
+    LOG.info("Disabling dynamic pruning for: "
+        + toRemove.getConf().getTableScan().toString() + ". Needed to break cyclic dependency");
+  }
+
+  // Tarjan's algo
+  private Set<Set<Operator<?>>> getComponents(OptimizeSparkProcContext procCtx) {
+    AtomicInteger index = new AtomicInteger();
+    Map<Operator<?>, Integer> indexes = new HashMap<Operator<?>, Integer>();
+    Map<Operator<?>, Integer> lowLinks = new HashMap<Operator<?>, Integer>();
+    Stack<Operator<?>> nodes = new Stack<Operator<?>>();
+    Set<Set<Operator<?>>> components = new HashSet<Set<Operator<?>>>();
+
+    for (Operator<?> o : procCtx.getParseContext().getTopOps().values()) {
+      if (!indexes.containsKey(o)) {
+        connect(o, index, nodes, indexes, lowLinks, components);
+      }
+    }
+    return components;
+  }
+
+  private void connect(Operator<?> o, AtomicInteger index, Stack<Operator<?>> nodes,
+      Map<Operator<?>, Integer> indexes, Map<Operator<?>, Integer> lowLinks,
+      Set<Set<Operator<?>>> components) {
+
+    indexes.put(o, index.get());
+    lowLinks.put(o, index.get());
+    index.incrementAndGet();
+    nodes.push(o);
+
+    List<Operator<?>> children;
+    if (o instanceof SparkPartitionPruningSinkOperator) {
+      children = new ArrayList<>();
+      children.addAll(o.getChildOperators());
+      TableScanOperator ts = ((SparkPartitionPruningSinkDesc) o.getConf()).getTableScan();
+      LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
+      children.add(ts);
+    } else {
+      children = o.getChildOperators();
+    }
+
+    for (Operator<?> child : children) {
+      if (!indexes.containsKey(child)) {
+        connect(child, index, nodes, indexes, lowLinks, components);
+        lowLinks.put(o, Math.min(lowLinks.get(o), lowLinks.get(child)));
+      } else if (nodes.contains(child)) {
+        lowLinks.put(o, Math.min(lowLinks.get(o), indexes.get(child)));
+      }
+    }
+
+    if (lowLinks.get(o).equals(indexes.get(o))) {
+      Set<Operator<?>> component = new HashSet<Operator<?>>();
+      components.add(component);
+      Operator<?> current;
+      do {
+        current = nodes.pop();
+        component.add(current);
+      } while (current != o);
+    }
+  }
+
   private void runStatsAnnotation(OptimizeSparkProcContext procCtx) throws SemanticException {
     new AnnotateWithStatistics().transform(procCtx.getParseContext());
     new AnnotateWithOpTraits().transform(procCtx.getParseContext());