You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/06 05:05:13 UTC
svn commit: r1616085 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/
optimizer/ optimizer/spark/ parse/spark/
Author: brock
Date: Wed Aug 6 03:05:13 2014
New Revision: 1616085
URL: http://svn.apache.org/r1616085
Log:
HIVE-7567 - support automatic calculating reduce task number (Chengxiang Li via Brock) [Spark Branch]
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1616085&r1=1616084&r2=1616085&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Wed Aug 6 03:05:13 2014
@@ -18,28 +18,48 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import java.io.IOException;
import java.util.List;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
public class SparkTask extends Task<SparkWork> {
private static final long serialVersionUID = 1L;
+ private transient JobConf job;
+ private transient ContentSummary inputSummary;
+
+ @Override
+ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
+ super.initialize(conf, queryPlan, driverContext);
+ job = new JobConf(conf, SparkTask.class);
+ }
@Override
public int execute(DriverContext driverContext) {
+
int rc = 1;
SparkClient client = null;
try {
+ configureNumberOfReducers();
client = SparkClient.getInstance(driverContext.getCtx().getConf());
rc = client.execute(driverContext, getWork());
- } finally {
+ } catch (Exception e) {
+ LOG.error("Failed to execute spark task.", e);
+ return 1;
+ }
+ finally {
if (client != null) {
rc = close(rc);
}
@@ -86,4 +106,49 @@ public class SparkTask extends Task<Spar
return "SPARK";
}
+ /**
+ * Set the number of reducers for the spark work.
+ */
+ private void configureNumberOfReducers() throws IOException {
+ for (BaseWork baseWork : work.getAllWork()) {
+ if (baseWork instanceof ReduceWork) {
+ configureNumberOfReducers((ReduceWork) baseWork);
+ }
+ }
+
+ console.printInfo("In order to change the average load for a reducer (in bytes):");
+ console.printInfo(" set " + HiveConf.ConfVars.BYTESPERREDUCER.varname + "=<number>");
+ console.printInfo("In order to limit the maximum number of reducers:");
+ console.printInfo(" set " + HiveConf.ConfVars.MAXREDUCERS.varname + "=<number>");
+ console.printInfo("In order to set a constant number of reducers:");
+ console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
+ }
+
+ private void configureNumberOfReducers(ReduceWork rWork) throws IOException {
+ // this is a temporary hack to fix things that are not fixed in the compiler
+ Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks();
+
+ if (rWork == null) {
+ console.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
+ } else {
+ if (numReducersFromWork >= 0) {
+ console.printInfo("Number of reduce tasks determined at compile time: "
+ + rWork.getNumReduceTasks());
+ } else if (job.getNumReduceTasks() > 0) {
+ int reducers = job.getNumReduceTasks();
+ rWork.setNumReduceTasks(reducers);
+ console.printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+ + reducers);
+ } else {
+ if (inputSummary == null) {
+ inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
+ }
+ int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
+ false);
+ rWork.setNumReduceTasks(reducers);
+ console.printInfo("Number of reduce tasks not specified. Estimated from input data size: "
+ + reducers);
+ }
+ }
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1616085&r1=1616084&r2=1616085&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Wed Aug 6 03:05:13 2014
@@ -126,7 +126,8 @@ public class Optimizer {
transformations.add(new StatsOptimizer());
}
if (pctx.getContext().getExplain() ||
- HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") ||
+ HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
transformations.add(new AnnotateWithStatistics());
transformations.add(new AnnotateWithOpTraits());
}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1616085&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Wed Aug 6 03:05:13 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.OptimizeSparkProcContext;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+
+import java.util.Stack;
+
+/**
+ * SetSparkReducerParallelism determines how many reducers should
+ * be run for a given reduce sink, clone from SetReducerParallelism.
+ */
+public class SetSparkReducerParallelism implements NodeProcessor {
+
+ private static final Log LOG = LogFactory.getLog(SetSparkReducerParallelism.class.getName());
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack,
+ NodeProcessorCtx procContext, Object... nodeOutputs)
+ throws SemanticException {
+
+ OptimizeSparkProcContext context = (OptimizeSparkProcContext) procContext;
+
+ ReduceSinkOperator sink = (ReduceSinkOperator) nd;
+ ReduceSinkDesc desc = sink.getConf();
+
+ long bytesPerReducer = context.getConf().getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+ int maxReducers = context.getConf().getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+ int constantReducers = context.getConf().getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
+
+ if (context.getVisitedReduceSinks().contains(sink)) {
+ // skip walking the children
+ LOG.debug("Already processed reduce sink: " + sink.getName());
+ return true;
+ }
+
+ context.getVisitedReduceSinks().add(sink);
+
+ if (desc.getNumReducers() <= 0) {
+ if (constantReducers > 0) {
+ LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers);
+ desc.setNumReducers(constantReducers);
+ } else {
+ long numberOfBytes = 0;
+
+ // we need to add up all the estimates from the siblings of this reduce sink
+ for (Operator<? extends OperatorDesc> sibling:
+ sink.getChildOperators().get(0).getParentOperators()) {
+ if (sibling.getStatistics() != null) {
+ numberOfBytes += sibling.getStatistics().getDataSize();
+ } else {
+ LOG.warn("No stats available from: " + sibling);
+ }
+ }
+
+ int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
+ maxReducers, false);
+ LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
+ desc.setNumReducers(numReducers);
+ desc.setAutoParallel(true);
+ }
+ } else {
+ LOG.info("Number of reducers determined to be: " + desc.getNumReducers());
+ }
+
+ return false;
+ }
+
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1616085&r1=1616084&r2=1616085&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Wed Aug 6 03:05:13 2014
@@ -82,14 +82,6 @@ public class GenSparkUtils {
Preconditions.checkArgument(!root.getParentOperators().isEmpty(),
"AssertionError: expected root.getParentOperators() to be non-empty");
- boolean isAutoReduceParallelism =
- context.conf.getBoolVar(HiveConf.ConfVars.TEZ_AUTO_REDUCER_PARALLELISM);
-
- float maxPartitionFactor =
- context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MAX_PARTITION_FACTOR);
- float minPartitionFactor = context.conf.getFloatVar(HiveConf.ConfVars.TEZ_MIN_PARTITION_FACTOR);
- long bytesPerReducer = context.conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
-
ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
logger.debug("Adding reduce work (" + reduceWork.getName() + ") for " + root);
reduceWork.setReducer(root);
@@ -106,31 +98,12 @@ public class GenSparkUtils {
reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
- if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) {
- reduceWork.setAutoReduceParallelism(true);
-
- // configured limit for reducers
- int maxReducers = context.conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
-
- // min we allow spark to pick
- int minPartition = Math.max(1, (int) (reduceSink.getConf().getNumReducers()
- * minPartitionFactor));
- minPartition = (minPartition > maxReducers) ? maxReducers : minPartition;
-
- // max we allow spark to pick
- int maxPartition = (int) (reduceSink.getConf().getNumReducers() * maxPartitionFactor);
- maxPartition = (maxPartition > maxReducers) ? maxReducers : maxPartition;
-
- reduceWork.setMinReduceTasks(minPartition);
- reduceWork.setMaxReduceTasks(maxPartition);
- }
-
setupReduceSink(context, reduceWork, reduceSink);
sparkWork.add(reduceWork);
SparkEdgeProperty edgeProp = new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE,
- reduceSink.getConf().getNumReducers());
+ reduceWork.getNumReduceTasks());
if (root instanceof GroupByOperator) {
edgeProp.setShuffleGroup();
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java?rev=1616085&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java Wed Aug 6 03:05:13 2014
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * OptimizeSparkProcContext. OptimizeSparkProcContext maintains information
+ * about the current operator plan as we walk the operator tree
+ * to do some additional optimizations on it. clone from OptimizeTezProcContext.
+ *
+ */
+public class OptimizeSparkProcContext implements NodeProcessorCtx {
+
+ private final ParseContext parseContext;
+ private final HiveConf conf;
+ private final Set<ReadEntity> inputs;
+ private final Set<WriteEntity> outputs;
+ private final Set<ReduceSinkOperator> visitedReduceSinks = new HashSet<ReduceSinkOperator>();
+
+ // rootOperators are all the table scan operators in sequence
+ // of traversal
+ private final Deque<Operator<? extends OperatorDesc>> rootOperators;
+
+ public OptimizeSparkProcContext(HiveConf conf, ParseContext parseContext,
+ Set<ReadEntity> inputs, Set<WriteEntity> outputs,
+ Deque<Operator<? extends OperatorDesc>> rootOperators) {
+
+ this.conf = conf;
+ this.parseContext = parseContext;
+ this.inputs = inputs;
+ this.outputs = outputs;
+ this.rootOperators = rootOperators;
+ }
+
+ public ParseContext getParseContext() {
+ return parseContext;
+ }
+
+ public HiveConf getConf() {
+ return conf;
+ }
+
+ public Set<ReadEntity> getInputs() {
+ return inputs;
+ }
+
+ public Set<WriteEntity> getOutputs() {
+ return outputs;
+ }
+
+ public Set<ReduceSinkOperator> getVisitedReduceSinks() {
+ return visitedReduceSinks;
+ }
+
+ public Deque<Operator<? extends OperatorDesc>> getRootOperators() {
+ return rootOperators;
+ }
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1616085&r1=1616084&r2=1616085&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Wed Aug 6 03:05:13 2014
@@ -19,8 +19,10 @@ package org.apache.hadoop.hive.ql.parse.
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.hooks.W
import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.ForwardWalker;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
@@ -48,18 +51,19 @@ import org.apache.hadoop.hive.ql.lib.Rul
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
-import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
+import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
+import org.apache.hadoop.hive.ql.optimizer.spark.SetSparkReducerParallelism;
+import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TaskCompiler;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.TaskCompiler;
/**
* SparkCompiler translates the operator plan into SparkTasks.
@@ -87,9 +91,8 @@ public class SparkCompiler extends TaskC
protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
Set<WriteEntity> outputs) throws SemanticException {
// TODO: need to add spark specific optimization.
-/*
// Sequence of TableScan operators to be walked
- Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+ Deque<Operator<? extends OperatorDesc>> deque = new LinkedList<Operator<? extends OperatorDesc>>();
deque.addAll(pCtx.getTopOps().values());
// Create the context for the walker
@@ -99,12 +102,13 @@ public class SparkCompiler extends TaskC
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack.
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"),
+ opRules.put(new RuleRegExp("Set parallelism - ReduceSink",
ReduceSinkOperator.getOperatorName() + "%"),
- new SetReducerParallelism());
+ new SetSparkReducerParallelism());
- opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
- JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
+ // TODO: need to research and verify support convert join to map join optimization.
+ //opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
+ // JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
@@ -113,7 +117,6 @@ public class SparkCompiler extends TaskC
topNodes.addAll(pCtx.getTopOps().values());
GraphWalker ogw = new ForwardWalker(disp);
ogw.startWalking(topNodes, null);
-*/
}
/**