You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/07 00:06:16 UTC
svn commit: r1637255 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql:
exec/spark/SparkClient.java optimizer/spark/SetSparkReducerParallelism.java
Author: xuefu
Date: Thu Nov 6 23:06:15 2014
New Revision: 1637255
URL: http://svn.apache.org/r1637255
Log:
HIVE-8649: Increase level of parallelism in reduce phase [Spark Branch] (Jimmy via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1637255&r1=1637254&r2=1637255&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Thu Nov 6 23:06:15 2014
@@ -18,8 +18,15 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -41,15 +48,16 @@ import org.apache.hadoop.hive.ql.session
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ui.jobs.JobProgressListener;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.*;
+import scala.Tuple2;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
public class SparkClient implements Serializable {
private static final long serialVersionUID = 1L;
@@ -73,6 +81,32 @@ public class SparkClient implements Seri
return client;
}
+ /**
+ * Get Spark shuffle memory per task, and total number of cores. This
+ * information can be used to estimate how many reducers a task can have.
+ *
+ * @return a tuple, the first element is the shuffle memory per task in bytes,
+ * the second element is the number of total cores usable by the client
+ */
+ public static Tuple2<Long, Integer>
+ getMemoryAndCores(Configuration hiveConf) {
+ SparkClient client = getInstance(hiveConf);
+ SparkContext sc = client.sc.sc();
+ SparkConf sparkConf = sc.conf();
+ int cores = sparkConf.getInt("spark.executor.cores", sc.defaultParallelism());
+ double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2);
+ // sc.executorMemory() is in MB, need to convert to bytes
+ long memoryPerTask =
+ (long) (sc.executorMemory() * memoryFraction * 1024 * 1024 / cores);
+ int executors = sc.getExecutorMemoryStatus().size();
+ int totalCores = executors * cores;
+ LOG.info("Spark cluster current has executors: " + executors
+ + ", cores per executor: " + cores + ", memory per executor: "
+ + sc.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction);
+ return new Tuple2<Long, Integer>(Long.valueOf(memoryPerTask),
+ Integer.valueOf(totalCores));
+ }
+
private JavaSparkContext sc;
private List<String> localJars = new ArrayList<String>();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1637255&r1=1637254&r2=1637255&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Thu Nov 6 23:06:15 2014
@@ -18,12 +18,15 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
+import java.util.Stack;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.SparkClient;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -32,7 +35,7 @@ import org.apache.hadoop.hive.ql.parse.s
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import java.util.Stack;
+import scala.Tuple2;
/**
* SetSparkReducerParallelism determines how many reducers should
@@ -42,6 +45,9 @@ public class SetSparkReducerParallelism
private static final Log LOG = LogFactory.getLog(SetSparkReducerParallelism.class.getName());
+ // Spark memory per task, and total number of cores
+ private Tuple2<Long, Integer> sparkMemoryAndCores;
+
@Override
public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procContext, Object... nodeOutputs)
@@ -52,7 +58,6 @@ public class SetSparkReducerParallelism
ReduceSinkOperator sink = (ReduceSinkOperator) nd;
ReduceSinkDesc desc = sink.getConf();
- long bytesPerReducer = context.getConf().getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
int maxReducers = context.getConf().getIntVar(HiveConf.ConfVars.MAXREDUCERS);
int constantReducers = context.getConf().getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
@@ -81,8 +86,20 @@ public class SetSparkReducerParallelism
}
}
+ if (sparkMemoryAndCores == null) {
+ sparkMemoryAndCores = SparkClient.getMemoryAndCores(context.getConf());
+ }
+
+ // Divide it by 2 so that we can have more reducers
+ long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
maxReducers, false);
+
+ // If there are more cores, use the number of cores
+ int cores = sparkMemoryAndCores._2.intValue();
+ if (numReducers < cores) {
+ numReducers = cores;
+ }
LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
desc.setNumReducers(numReducers);
}