You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/06 05:05:13 UTC

svn commit: r1616085 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/ optimizer/ optimizer/spark/ parse/spark/

Author: brock
Date: Wed Aug  6 03:05:13 2014
New Revision: 1616085

URL: http://svn.apache.org/r1616085
Log:
HIVE-7567 - support automatic calculating reduce task number (Chengxiang Li via Brock) [Spark Branch]

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1616085&r1=1616084&r2=1616085&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Wed Aug  6 03:05:13 2014
@@ -18,28 +18,48 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
 
 public class SparkTask extends Task<SparkWork> {
   private static final long serialVersionUID = 1L;
+  private transient JobConf job;
+  private transient ContentSummary inputSummary;
+
+  @Override
+  public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
+    super.initialize(conf, queryPlan, driverContext);
+    job = new JobConf(conf, SparkTask.class);
+  }
 
   @Override
   public int execute(DriverContext driverContext) {
+
     int rc = 1;
     SparkClient client = null;
     try {
+      configureNumberOfReducers();
       client = SparkClient.getInstance(driverContext.getCtx().getConf());
       rc = client.execute(driverContext, getWork());
-    } finally {
+    } catch (Exception e) {
+      LOG.error("Failed to execute spark task.", e);
+      return 1;
+    }
+    finally {
       if (client != null) {
         rc = close(rc);
       }
@@ -86,4 +106,49 @@ public class SparkTask extends Task<Spar
     return "SPARK";
   }
 
+  /**
+   * Set the number of reducers for the spark work.
+   */
+  private void configureNumberOfReducers() throws IOException {
+    for (BaseWork baseWork : work.getAllWork()) {
+      if (baseWork instanceof ReduceWork) {
+        configureNumberOfReducers((ReduceWork) baseWork);
+      }
+    }
+
+    console.printInfo("In order to change the average load for a reducer (in bytes):");
+    console.printInfo("  set " + HiveConf.ConfVars.BYTESPERREDUCER.varname + "=<number>");
+    console.printInfo("In order to limit the maximum number of reducers:");
+    console.printInfo("  set " + HiveConf.ConfVars.MAXREDUCERS.varname + "=<number>");
+    console.printInfo("In order to set a constant number of reducers:");
+    console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
+  }
+
+  private void configureNumberOfReducers(ReduceWork rWork) throws IOException {
+    // this is a temporary hack to fix things that are not fixed in the compiler
+    Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks();
+
+    if (rWork == null) {
+      console.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
+    } else {
+      if (numReducersFromWork >= 0) {
+        console.printInfo("Number of reduce tasks determined at compile time: "
+          + rWork.getNumReduceTasks());
+      } else if (job.getNumReduceTasks() > 0) {
+        int reducers = job.getNumReduceTasks();
+        rWork.setNumReduceTasks(reducers);
+        console.printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+            + reducers);
+      } else {
+        if (inputSummary == null) {
+          inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
+        }
+        int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
+          false);
+        rWork.setNumReduceTasks(reducers);
+        console.printInfo("Number of reduce tasks not specified. Estimated from input data size: "
+            + reducers);
+      }
+    }
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1616085&r1=1616084&r2=1616085&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Wed Aug  6 03:05:13 2014
@@ -126,7 +126,8 @@ public class Optimizer {
       transformations.add(new StatsOptimizer());
     }
     if (pctx.getContext().getExplain() ||
-        HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+        HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ||
+        HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
       transformations.add(new AnnotateWithStatistics());
       transformations.add(new AnnotateWithOpTraits());
     }

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1616085&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Wed Aug  6 03:05:13 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+
+import java.util.Stack;
+
+/**
+ * SetSparkReducerParallelism determines how many reducers should
+ * be run for a given reduce sink, clone from SetReducerParallelism.
+ */
+public class SetSparkReducerParallelism implements NodeProcessor {
+
+  private static final Log LOG = LogFactory.getLog(SetSparkReducerParallelism.class.getName());
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack,
+      NodeProcessorCtx procContext, Object... nodeOutputs)
+      throws SemanticException {
+
+    OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext;
+
+    ReduceSinkOperator sink = (ReduceSinkOperator) nd;
+    ReduceSinkDesc desc = sink.getConf();
+
+    long bytesPerReducer = context.getConf().getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+    int maxReducers = context.getConf().getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+    int constantReducers = context.getConf().getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
+
+    if (context.getVisitedReduceSinks().contains(sink)) {
+      // skip walking the children
+      LOG.debug("Already processed reduce sink: " + sink.getName());
+      return true;
+    }
+
+    context.getVisitedReduceSinks().add(sink);
+
+    if (desc.getNumReducers() <= 0) {
+      if (constantReducers > 0) {
+        LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers);
+        desc.setNumReducers(constantReducers);
+      } else {
+        long numberOfBytes = 0;
+
+        // we need to add up all the estimates from the siblings of this reduce sink
+        for (Operator<? extends OperatorDesc> sibling:
+          sink.getChildOperators().get(0).getParentOperators()) {
+          if (sibling.getStatistics() != null) {
+            numberOfBytes += sibling.getStatistics().getDataSize();
+          } else {
+            LOG.warn("No stats available from: " + sibling);
+          }
+        }
+
+        int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
+            maxReducers, false);
+        LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
+        desc.setNumReducers(numReducers);
+        desc.setAutoParallel(true);
+      }
+    } else {
+      LOG.info("Number of reducers determined to be: " + desc.getNumReducers());
+    }
+
+    return false;
+  }
+
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1616085&r1=1616084&r2=1616085&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Wed Aug  6 03:05:13 2014
@@ -82,14 +82,6 @@ public class GenSparkUtils {
     Preconditions.checkArgument(!root.getParentOperators().isEmpty(),
         "AssertionError: expected root.getParentOperators() to be non-empty");
 
-    boolean isAutoReduceParallelism =
-        context.conf.getBoolVar(HiveConf.ConfVars.TEZ_AUTO_REDUCER_PARALLELISM);
-
-    float maxPartitionFactor =
-        context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MAX_PARTITION_FACTOR);
-    float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR);
-    long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
-
     ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
     logger.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
     reduceWork.setReducer(root);
@@ -106,31 +98,12 @@ public class GenSparkUtils {
 
     reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
 
-    if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) {
-      reduceWork.setAutoReduceParallelism(true);
-
-      // configured limit for reducers
-      int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
-
-      // min we allow spark to pick
-      int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers() 
-        * minPartitionFactor));
-      minPartition = (minPartition > maxReducers) ? maxReducers : minPartition;
-
-      // max we allow spark to pick
-      int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor); 
-      maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition;
-
-      reduceWork.setMinReduceTasks(minPartition);
-      reduceWork.setMaxReduceTasks(maxPartition);
-    }
-
     setupReduceSink(context, reduceWork, reduceSink);
 
     sparkWork.add(reduceWork);
 
     SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE,
-        reduceSink.getConf().getNumReducers());
+        reduceWork.getNumReduceTasks());
 
     if (root instanceof GroupByOperator) {
       edgeProp.setShuffleGroup();

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java?rev=1616085&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java Wed Aug  6 03:05:13 2014
@@ -0,0 +1,86 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * OptimizeSparkProcContext. OptimizeSparkProcContext maintains information
+ * about the current operator plan as we walk the operator tree
+ * to do some additional optimizations on it. clone from OptimizeTezProcContext.
+ *
+ */
+public class OptimizeSparkProcContext implements NodeProcessorCtx {
+
+  private final ParseContext parseContext;
+  private final HiveConf conf;
+  private final Set<ReadEntity> inputs;
+  private final Set<WriteEntity> outputs;
+  private final Set<ReduceSinkOperator> visitedReduceSinks = new HashSet<ReduceSinkOperator>();
+
+  // rootOperators are all the table scan operators in sequence
+  // of traversal
+  private final Deque<Operator<? extends OperatorDesc>> rootOperators;
+
+  public OptimizeSparkProcContext(HiveConf conf, ParseContext parseContext,
+    Set<ReadEntity> inputs, Set<WriteEntity> outputs,
+    Deque<Operator<? extends OperatorDesc>> rootOperators) {
+
+    this.conf = conf;
+    this.parseContext = parseContext;
+    this.inputs = inputs;
+    this.outputs = outputs;
+    this.rootOperators = rootOperators;
+  }
+
+  public ParseContext getParseContext() {
+    return parseContext;
+  }
+
+  public HiveConf getConf() {
+    return conf;
+  }
+
+  public Set<ReadEntity> getInputs() {
+    return inputs;
+  }
+
+  public Set<WriteEntity> getOutputs() {
+    return outputs;
+  }
+
+  public Set<ReduceSinkOperator> getVisitedReduceSinks() {
+    return visitedReduceSinks;
+  }
+
+  public Deque<Operator<? extends OperatorDesc>> getRootOperators() {
+    return rootOperators;
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1616085&r1=1616084&r2=1616085&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Wed Aug  6 03:05:13 2014
@@ -19,8 +19,10 @@ package org.apache.hadoop.hive.ql.parse.
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.hooks.W
 import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.ForwardWalker;
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -48,18 +51,19 @@ import org.apache.hadoop.hive.ql.lib.Rul
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
-import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
+import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
+import org.apache.hadoop.hive.ql.optimizer.spark.SetSparkReducerParallelism;
+import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TaskCompiler;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.TaskCompiler;
 
 /**
  * SparkCompiler translates the operator plan into SparkTasks.
@@ -87,9 +91,8 @@ public class SparkCompiler extends TaskC
   protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
       Set<WriteEntity> outputs) throws SemanticException {
     // TODO: need to add spark specific optimization.
-/*
     // Sequence of TableScan operators to be walked
-    Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+    Deque<Operator<? extends OperatorDesc>> deque = new LinkedList<Operator<? extends OperatorDesc>>();
     deque.addAll(pCtx.getTopOps().values());
 
     // Create the context for the walker
@@ -99,12 +102,13 @@ public class SparkCompiler extends TaskC
     // create a walker which walks the tree in a DFS manner while maintaining
     // the operator stack.
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"),
+    opRules.put(new RuleRegExp("Set parallelism - ReduceSink",
         ReduceSinkOperator.getOperatorName() + "%"),
-        new SetReducerParallelism());
+        new SetSparkReducerParallelism());
 
-    opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
-        JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
+    // TODO: need to research and verify support convert join to map join optimization.
+    //opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
+    //    JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
@@ -113,7 +117,6 @@ public class SparkCompiler extends TaskC
     topNodes.addAll(pCtx.getTopOps().values());
     GraphWalker ogw = new ForwardWalker(disp);
     ogw.startWalking(topNodes, null);
-*/
   }
 
   /**