You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/06 02:50:25 UTC

[19/53] [abbrv] hive git commit: HIVE-11363: Prewarm Hive on Spark containers [Spark Branch] (reviewed by Chao)

HIVE-11363: Prewarm Hive on Spark containers [Spark Branch] (reviewed by Chao)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/537114b9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/537114b9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/537114b9

Branch: refs/heads/llap
Commit: 537114b964c71b7a5cd00c9938eadc6d0cf76536
Parents: 89736c8
Author: xzhang <xz...@xzdt>
Authored: Thu Jul 30 12:48:31 2015 -0700
Committer: xzhang <xz...@xzdt>
Committed: Thu Jul 30 12:48:31 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  5 ++
 .../ql/exec/spark/HiveSparkClientFactory.java   |  5 +-
 .../ql/exec/spark/RemoteHiveSparkClient.java    | 51 ++++++++++++++++++--
 3 files changed, 54 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/537114b9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bee1756..098e7bd 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2217,6 +2217,11 @@ public class HiveConf extends Configuration {
     SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE(
         "hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L,
         "Maximum total data size in dynamic pruning."),
+    SPARK_PREWARM_CONTAINERS("hive.spark.prewarm.containers", false, "Whether to prewarn containers for Spark." +
+      "If enabled, Hive will spend no more than 60 seconds to wait for the containers to come up " +
+      "before any query can be executed."),
+    SPARK_PREWARM_NUM_CONTAINERS("hive.spark.prewarm.num.containers", 10, "The minimum number of containers to be prewarmed for Spark." +
+      "Applicable only if hive.spark.prewarm.containers is set to true."),
     NWAYJOINREORDER("hive.reorder.nway.joins", true,
       "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"),
     HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null),

http://git-wip-us.apache.org/repos/asf/hive/blob/537114b9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 007db75..e12a97d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hive.spark.client.rpc.RpcConfiguration;
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkException;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Splitter;
@@ -52,9 +51,7 @@ public class HiveSparkClientFactory {
   private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
   private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false";
 
-  public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf)
-    throws IOException, SparkException {
-
+  public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception {
     Map<String, String> sparkConf = initiateSparkConf(hiveconf);
     // Submit spark job through local spark context while spark master is local mode, otherwise submit
     // spark job through remote spark context.

http://git-wip-us.apache.org/repos/asf/hive/blob/537114b9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 4073d2b..92167e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -59,7 +60,6 @@ import org.apache.hive.spark.client.SparkClientFactory;
 import org.apache.hive.spark.client.SparkClientUtilities;
 import org.apache.hive.spark.counter.SparkCounters;
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaFutureAction;
 import org.apache.spark.api.java.JavaPairRDD;
 
@@ -85,11 +85,56 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
 
   private final transient long sparkClientTimtout;
 
-  RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws IOException, SparkException {
+  RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws Exception {
     this.hiveConf = hiveConf;
+    sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT,
+        TimeUnit.SECONDS);
     sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
     remoteClient = SparkClientFactory.createClient(conf, hiveConf);
-    sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
+
+    if (HiveConf.getBoolVar(hiveConf, ConfVars.SPARK_PREWARM_CONTAINERS) &&
+        hiveConf.get("spark.master").startsWith("yarn-")) {
+      int minExecutors = getExecutorsToWarm();
+      if (minExecutors <= 0) {
+        return;
+      }
+
+      LOG.info("Prewarm Spark executors. The minimum number of executors to warm is " + minExecutors);
+
+      // Spend at most 60s to wait for executors to come up.
+      int curExecutors = 0;
+      long ts = System.currentTimeMillis();
+      do {
+        curExecutors = getExecutorCount();
+        if (curExecutors >= minExecutors) {
+          LOG.info("Finished prewarming Spark executors. The current number of executors is " + curExecutors);
+          return;
+        }
+        Thread.sleep(1000); // sleep 1 second
+      } while (System.currentTimeMillis() - ts < 60000);
+
+      LOG.info("Timeout (60s) occurred while prewarming executors. The current number of executors is " + curExecutors);
+    }
+  }
+
+  /**
+   * Please note that the method is very tied with Spark documentation 1.4.1 regarding
+   * dynamic allocation, such as default values.
+   * @return
+   */
+  private int getExecutorsToWarm() {
+    int minExecutors =
+        HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.SPARK_PREWARM_NUM_CONTAINERS);
+    boolean dynamicAllocation = hiveConf.getBoolean("spark.dynamicAllocation.enabled", false);
+    if (dynamicAllocation) {
+      int min = sparkConf.getInt("spark.dynamicAllocation.minExecutors", 0);
+      int initExecutors = sparkConf.getInt("spark.dynamicAllocation.initialExecutors", min);
+      minExecutors = Math.min(minExecutors, initExecutors);
+    } else {
+      int execInstances = sparkConf.getInt("spark.executor.instances", 2);
+      minExecutors = Math.min(minExecutors, execInstances);
+    }
+    return minExecutors;
   }
 
   @Override