You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2016/12/19 03:21:32 UTC
hive git commit: HIVE-15428: HoS DPP doesn't remove cyclic dependency
(Rui Li reviewed by Chao Sun)
Repository: hive
Updated Branches:
refs/heads/master 65940caca -> c6a7edd5a
HIVE-15428: HoS DPP doesn't remove cyclic dependency (Rui Li reviewed by Chao Sun)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c6a7edd5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c6a7edd5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c6a7edd5
Branch: refs/heads/master
Commit: c6a7edd5a299cfca2e35ddb11edabc2ad126f2ad
Parents: 65940ca
Author: Rui Li <li...@apache.org>
Authored: Mon Dec 19 11:21:23 2016 +0800
Committer: Rui Li <li...@apache.org>
Committed: Mon Dec 19 11:21:23 2016 +0800
----------------------------------------------------------------------
.../SparkRemoveDynamicPruningBySize.java | 12 +-
.../hive/ql/parse/spark/GenSparkUtils.java | 19 ++++
.../hive/ql/parse/spark/SparkCompiler.java | 112 +++++++++++++++++++
3 files changed, 133 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c6a7edd5/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
index a6bf3af..c41a0c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SparkRemoveDynamicPruningBySize.java
@@ -23,12 +23,12 @@ import java.util.Stack;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils;
import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
@@ -54,15 +54,7 @@ public class SparkRemoveDynamicPruningBySize implements NodeProcessor {
if (desc.getStatistics().getDataSize() > context.getConf()
.getLongVar(ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE)) {
- Operator<?> child = op;
- Operator<?> curr = op;
-
- while (curr.getChildOperators().size() <= 1) {
- child = curr;
- curr = curr.getParentOperators().get(0);
- }
-
- curr.removeChild(child);
+ GenSparkUtils.removeBranch(op);
// at this point we've found the fork in the op pipeline that has the pruning as a child plan.
LOG.info("Disabling dynamic pruning for: "
+ desc.getTableScan().getName()
http://git-wip-us.apache.org/repos/asf/hive/blob/c6a7edd5/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 8a85574..7b2b3c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -596,4 +596,23 @@ public class GenSparkUtils {
findRoots(p, ops);
}
}
+
+ /**
+ * Remove the branch that contains the specified operator. Do nothing if there's no branching,
+ * i.e. all the upstream operators have only one child.
+ */
+ public static void removeBranch(Operator<?> op) {
+ Operator<?> child = op;
+ Operator<?> curr = op;
+
+ while (curr.getChildOperators().size() <= 1) {
+ child = curr;
+ if (curr.getParentOperators() == null || curr.getParentOperators().isEmpty()) {
+ return;
+ }
+ curr = curr.getParentOperators().get(0);
+ }
+
+ curr.removeChild(child);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c6a7edd5/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index baf77c7..71528e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -20,11 +20,13 @@ package org.apache.hadoop.hive.ql.parse.spark;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
@@ -73,6 +75,7 @@ import org.apache.hadoop.hive.ql.optimizer.spark.CombineEquivalentWorkResolver;
import org.apache.hadoop.hive.ql.optimizer.spark.SetSparkReducerParallelism;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinHintOptimizer;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinOptimizer;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkSkewJoinResolver;
import org.apache.hadoop.hive.ql.optimizer.spark.SplitSparkWorkResolver;
@@ -116,9 +119,118 @@ public class SparkCompiler extends TaskCompiler {
// Run Join releated optimizations
runJoinOptimizations(procCtx);
+ // Remove cyclic dependencies for DPP
+ runCycleAnalysisForPartitionPruning(procCtx);
+
PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE);
}
+ private void runCycleAnalysisForPartitionPruning(OptimizeSparkProcContext procCtx) {
+ if (!conf.getBoolVar(HiveConf.ConfVars.SPARK_DYNAMIC_PARTITION_PRUNING)) {
+ return;
+ }
+
+ boolean cycleFree = false;
+ while (!cycleFree) {
+ cycleFree = true;
+ Set<Set<Operator<?>>> components = getComponents(procCtx);
+ for (Set<Operator<?>> component : components) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Component: ");
+ for (Operator<?> co : component) {
+ LOG.debug("Operator: " + co.getName() + ", " + co.getIdentifier());
+ }
+ }
+ if (component.size() != 1) {
+ LOG.info("Found cycle in operator plan...");
+ cycleFree = false;
+ removeDPPOperator(component, procCtx);
+ break;
+ }
+ }
+ LOG.info("Cycle free: " + cycleFree);
+ }
+ }
+
+ private void removeDPPOperator(Set<Operator<?>> component, OptimizeSparkProcContext context) {
+ SparkPartitionPruningSinkOperator toRemove = null;
+ for (Operator<?> o : component) {
+ if (o instanceof SparkPartitionPruningSinkOperator) {
+ // we want to remove the DPP with bigger data size
+ if (toRemove == null
+ || o.getConf().getStatistics().getDataSize() > toRemove.getConf().getStatistics()
+ .getDataSize()) {
+ toRemove = (SparkPartitionPruningSinkOperator) o;
+ }
+ }
+ }
+
+ if (toRemove == null) {
+ return;
+ }
+
+ GenSparkUtils.removeBranch(toRemove);
+ // at this point we've found the fork in the op pipeline that has the pruning as a child plan.
+ LOG.info("Disabling dynamic pruning for: "
+ + toRemove.getConf().getTableScan().toString() + ". Needed to break cyclic dependency");
+ }
+
+ // Tarjan's algo
+ private Set<Set<Operator<?>>> getComponents(OptimizeSparkProcContext procCtx) {
+ AtomicInteger index = new AtomicInteger();
+ Map<Operator<?>, Integer> indexes = new HashMap<Operator<?>, Integer>();
+ Map<Operator<?>, Integer> lowLinks = new HashMap<Operator<?>, Integer>();
+ Stack<Operator<?>> nodes = new Stack<Operator<?>>();
+ Set<Set<Operator<?>>> components = new HashSet<Set<Operator<?>>>();
+
+ for (Operator<?> o : procCtx.getParseContext().getTopOps().values()) {
+ if (!indexes.containsKey(o)) {
+ connect(o, index, nodes, indexes, lowLinks, components);
+ }
+ }
+ return components;
+ }
+
+ private void connect(Operator<?> o, AtomicInteger index, Stack<Operator<?>> nodes,
+ Map<Operator<?>, Integer> indexes, Map<Operator<?>, Integer> lowLinks,
+ Set<Set<Operator<?>>> components) {
+
+ indexes.put(o, index.get());
+ lowLinks.put(o, index.get());
+ index.incrementAndGet();
+ nodes.push(o);
+
+ List<Operator<?>> children;
+ if (o instanceof SparkPartitionPruningSinkOperator) {
+ children = new ArrayList<>();
+ children.addAll(o.getChildOperators());
+ TableScanOperator ts = ((SparkPartitionPruningSinkDesc) o.getConf()).getTableScan();
+ LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
+ children.add(ts);
+ } else {
+ children = o.getChildOperators();
+ }
+
+ for (Operator<?> child : children) {
+ if (!indexes.containsKey(child)) {
+ connect(child, index, nodes, indexes, lowLinks, components);
+ lowLinks.put(o, Math.min(lowLinks.get(o), lowLinks.get(child)));
+ } else if (nodes.contains(child)) {
+ lowLinks.put(o, Math.min(lowLinks.get(o), indexes.get(child)));
+ }
+ }
+
+ if (lowLinks.get(o).equals(indexes.get(o))) {
+ Set<Operator<?>> component = new HashSet<Operator<?>>();
+ components.add(component);
+ Operator<?> current;
+ do {
+ current = nodes.pop();
+ component.add(current);
+ } while (current != o);
+ }
+ }
+
private void runStatsAnnotation(OptimizeSparkProcContext procCtx) throws SemanticException {
new AnnotateWithStatistics().transform(procCtx.getParseContext());
new AnnotateWithOpTraits().transform(procCtx.getParseContext());