You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/11/07 00:06:16 UTC

svn commit: r1637255 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql: exec/spark/SparkClient.java optimizer/spark/SetSparkReducerParallelism.java

Author: xuefu
Date: Thu Nov  6 23:06:15 2014
New Revision: 1637255

URL: http://svn.apache.org/r1637255
Log:
HIVE-8649: Increase level of parallelism in reduce phase [Spark Branch] (Jimmy via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1637255&r1=1637254&r2=1637255&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Thu Nov  6 23:06:15 2014
@@ -18,8 +18,15 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import com.google.common.base.Splitter;
-import com.google.common.base.Strings;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,15 +48,16 @@ import org.apache.hadoop.hive.ql.session
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.ui.jobs.JobProgressListener;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.*;
+import scala.Tuple2;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 
 public class SparkClient implements Serializable {
   private static final long serialVersionUID = 1L;
@@ -73,6 +81,32 @@ public class SparkClient implements Seri
     return client;
   }
 
+  /**
+   * Get Spark shuffle memory per task, and total number of cores. This
+   * information can be used to estimate how many reducers a task can have.
+   *
+   * @return a tuple, the first element is the shuffle memory per task in bytes,
+   *  the second element is the number of total cores usable by the client
+   */
+  public static Tuple2<Long, Integer>
+      getMemoryAndCores(Configuration hiveConf) {
+    SparkClient client = getInstance(hiveConf);
+    SparkContext sc = client.sc.sc();
+    SparkConf sparkConf = sc.conf();
+    int cores = sparkConf.getInt("spark.executor.cores", sc.defaultParallelism());
+    double memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.2);
+    // sc.executorMemory() is in MB, need to convert to bytes
+    long memoryPerTask =
+      (long) (sc.executorMemory() * memoryFraction * 1024 * 1024 / cores);
+    int executors = sc.getExecutorMemoryStatus().size();
+    int totalCores = executors * cores;
+    LOG.info("Spark cluster current has executors: " + executors
+      + ", cores per executor: " + cores + ", memory per executor: "
+      + sc.executorMemory() + "M, shuffle memoryFraction: " + memoryFraction);
+    return new Tuple2<Long, Integer>(Long.valueOf(memoryPerTask),
+      Integer.valueOf(totalCores));
+  }
+
   private JavaSparkContext sc;
 
   private List<String> localJars = new ArrayList<String>();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java?rev=1637255&r1=1637254&r2=1637255&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java Thu Nov  6 23:06:15 2014
@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
+import java.util.Stack;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.SparkClient;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -32,7 +35,7 @@ import org.apache.hadoop.hive.ql.parse.s
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 
-import java.util.Stack;
+import scala.Tuple2;
 
 /**
  * SetSparkReducerParallelism determines how many reducers should
@@ -42,6 +45,9 @@ public class SetSparkReducerParallelism 
 
   private static final Log LOG = LogFactory.getLog(SetSparkReducerParallelism.class.getName());
 
+  // Spark memory per task, and total number of cores
+  private Tuple2<Long, Integer> sparkMemoryAndCores;
+
   @Override
   public Object process(Node nd, Stack<Node> stack,
       NodeProcessorCtx procContext, Object... nodeOutputs)
@@ -52,7 +58,6 @@ public class SetSparkReducerParallelism 
     ReduceSinkOperator sink = (ReduceSinkOperator) nd;
     ReduceSinkDesc desc = sink.getConf();
 
-    long bytesPerReducer = context.getConf().getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
     int maxReducers = context.getConf().getIntVar(HiveConf.ConfVars.MAXREDUCERS);
     int constantReducers = context.getConf().getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS);
 
@@ -81,8 +86,20 @@ public class SetSparkReducerParallelism 
           }
         }
 
+        if (sparkMemoryAndCores == null) {
+          sparkMemoryAndCores = SparkClient.getMemoryAndCores(context.getConf());
+        }
+
+        // Divide it by 2 so that we can have more reducers
+        long bytesPerReducer = sparkMemoryAndCores._1.longValue() / 2;
         int numReducers = Utilities.estimateReducers(numberOfBytes, bytesPerReducer,
             maxReducers, false);
+
+        // If there are more cores, use the number of cores
+        int cores = sparkMemoryAndCores._2.intValue();
+        if (numReducers < cores) {
+          numReducers = cores;
+        }
         LOG.info("Set parallelism for reduce sink " + sink + " to: " + numReducers);
         desc.setNumReducers(numReducers);
       }