You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/08/02 05:59:35 UTC
[50/50] hive git commit: HIVE-11363: Prewarm Hive on Spark containers
[Spark Branch] (reviewed by Chao)
HIVE-11363: Prewarm Hive on Spark containers [Spark Branch] (reviewed by Chao)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d8e79a6c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d8e79a6c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d8e79a6c
Branch: refs/heads/branch-1
Commit: d8e79a6c02636bab40b1432479353780885fecf0
Parents: d7413e4
Author: xzhang <xz...@xzdt>
Authored: Thu Jul 30 12:48:31 2015 -0700
Committer: xzhang <xz...@xzdt>
Committed: Sat Aug 1 20:57:13 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 ++
.../ql/exec/spark/HiveSparkClientFactory.java | 5 +-
.../ql/exec/spark/RemoteHiveSparkClient.java | 51 ++++++++++++++++++--
3 files changed, 54 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d8e79a6c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 54e9cbb..3544142 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2218,6 +2218,11 @@ public class HiveConf extends Configuration {
SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE(
"hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L,
"Maximum total data size in dynamic pruning."),
+ SPARK_PREWARM_CONTAINERS("hive.spark.prewarm.containers", false, "Whether to prewarn containers for Spark." +
+ "If enabled, Hive will spend no more than 60 seconds to wait for the containers to come up " +
+ "before any query can be executed."),
+ SPARK_PREWARM_NUM_CONTAINERS("hive.spark.prewarm.num.containers", 10, "The minimum number of containers to be prewarmed for Spark." +
+ "Applicable only if hive.spark.prewarm.containers is set to true."),
NWAYJOINREORDER("hive.reorder.nway.joins", true,
"Runs reordering of tables within single n-way join (i.e.: picks streamtable)"),
HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null),
http://git-wip-us.apache.org/repos/asf/hive/blob/d8e79a6c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 007db75..e12a97d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.spark.SparkConf;
-import org.apache.spark.SparkException;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
@@ -52,9 +51,7 @@ public class HiveSparkClientFactory {
private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false";
- public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf)
- throws IOException, SparkException {
-
+ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception {
Map<String, String> sparkConf = initiateSparkConf(hiveconf);
// Submit spark job through local spark context while spark master is local mode, otherwise submit
// spark job through remote spark context.
http://git-wip-us.apache.org/repos/asf/hive/blob/d8e79a6c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 4073d2b..92167e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -59,7 +60,6 @@ import org.apache.hive.spark.client.SparkClientFactory;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.SparkConf;
-import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
@@ -85,11 +85,56 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
private final transient long sparkClientTimtout;
- RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws IOException, SparkException {
+ RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws Exception {
this.hiveConf = hiveConf;
+ sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT,
+ TimeUnit.SECONDS);
sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
remoteClient = SparkClientFactory.createClient(conf, hiveConf);
- sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
+
+ if (HiveConf.getBoolVar(hiveConf, ConfVars.SPARK_PREWARM_CONTAINERS) &&
+ hiveConf.get("spark.master").startsWith("yarn-")) {
+ int minExecutors = getExecutorsToWarm();
+ if (minExecutors <= 0) {
+ return;
+ }
+
+ LOG.info("Prewarm Spark executors. The minimum number of executors to warm is " + minExecutors);
+
+ // Spend at most 60s to wait for executors to come up.
+ int curExecutors = 0;
+ long ts = System.currentTimeMillis();
+ do {
+ curExecutors = getExecutorCount();
+ if (curExecutors >= minExecutors) {
+ LOG.info("Finished prewarming Spark executors. The current number of executors is " + curExecutors);
+ return;
+ }
+ Thread.sleep(1000); // sleep 1 second
+ } while (System.currentTimeMillis() - ts < 60000);
+
+ LOG.info("Timeout (60s) occurred while prewarming executors. The current number of executors is " + curExecutors);
+ }
+ }
+
+ /**
+ * Please note that the method is very tied with Spark documentation 1.4.1 regarding
+ * dynamic allocation, such as default values.
+ * @return
+ */
+ private int getExecutorsToWarm() {
+ int minExecutors =
+ HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.SPARK_PREWARM_NUM_CONTAINERS);
+ boolean dynamicAllocation = hiveConf.getBoolean("spark.dynamicAllocation.enabled", false);
+ if (dynamicAllocation) {
+ int min = sparkConf.getInt("spark.dynamicAllocation.minExecutors", 0);
+ int initExecutors = sparkConf.getInt("spark.dynamicAllocation.initialExecutors", min);
+ minExecutors = Math.min(minExecutors, initExecutors);
+ } else {
+ int execInstances = sparkConf.getInt("spark.executor.instances", 2);
+ minExecutors = Math.min(minExecutors, execInstances);
+ }
+ return minExecutors;
}
@Override