You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/06 02:50:25 UTC
[19/53] [abbrv] hive git commit: HIVE-11363: Prewarm Hive on Spark
containers [Spark Branch] (reviewed by Chao)
HIVE-11363: Prewarm Hive on Spark containers [Spark Branch] (reviewed by Chao)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/537114b9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/537114b9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/537114b9
Branch: refs/heads/llap
Commit: 537114b964c71b7a5cd00c9938eadc6d0cf76536
Parents: 89736c8
Author: xzhang <xz...@xzdt>
Authored: Thu Jul 30 12:48:31 2015 -0700
Committer: xzhang <xz...@xzdt>
Committed: Thu Jul 30 12:48:31 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 ++
.../ql/exec/spark/HiveSparkClientFactory.java | 5 +-
.../ql/exec/spark/RemoteHiveSparkClient.java | 51 ++++++++++++++++++--
3 files changed, 54 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/537114b9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bee1756..098e7bd 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2217,6 +2217,11 @@ public class HiveConf extends Configuration {
SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE(
"hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L,
"Maximum total data size in dynamic pruning."),
+ SPARK_PREWARM_CONTAINERS("hive.spark.prewarm.containers", false, "Whether to prewarn containers for Spark." +
+ "If enabled, Hive will spend no more than 60 seconds to wait for the containers to come up " +
+ "before any query can be executed."),
+ SPARK_PREWARM_NUM_CONTAINERS("hive.spark.prewarm.num.containers", 10, "The minimum number of containers to be prewarmed for Spark." +
+ "Applicable only if hive.spark.prewarm.containers is set to true."),
NWAYJOINREORDER("hive.reorder.nway.joins", true,
"Runs reordering of tables within single n-way join (i.e.: picks streamtable)"),
HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null),
http://git-wip-us.apache.org/repos/asf/hive/blob/537114b9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 007db75..e12a97d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hive.spark.client.rpc.RpcConfiguration;
import org.apache.spark.SparkConf;
-import org.apache.spark.SparkException;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
@@ -52,9 +51,7 @@ public class HiveSparkClientFactory {
private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false";
- public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf)
- throws IOException, SparkException {
-
+ public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception {
Map<String, String> sparkConf = initiateSparkConf(hiveconf);
// Submit spark job through local spark context while spark master is local mode, otherwise submit
// spark job through remote spark context.
http://git-wip-us.apache.org/repos/asf/hive/blob/537114b9/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 4073d2b..92167e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -59,7 +60,6 @@ import org.apache.hive.spark.client.SparkClientFactory;
import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.hive.spark.counter.SparkCounters;
import org.apache.spark.SparkConf;
-import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaPairRDD;
@@ -85,11 +85,56 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
private final transient long sparkClientTimtout;
- RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws IOException, SparkException {
+ RemoteHiveSparkClient(HiveConf hiveConf, Map<String, String> conf) throws Exception {
this.hiveConf = hiveConf;
+ sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT,
+ TimeUnit.SECONDS);
sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
remoteClient = SparkClientFactory.createClient(conf, hiveConf);
- sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS);
+
+ if (HiveConf.getBoolVar(hiveConf, ConfVars.SPARK_PREWARM_CONTAINERS) &&
+ hiveConf.get("spark.master").startsWith("yarn-")) {
+ int minExecutors = getExecutorsToWarm();
+ if (minExecutors <= 0) {
+ return;
+ }
+
+ LOG.info("Prewarm Spark executors. The minimum number of executors to warm is " + minExecutors);
+
+ // Spend at most 60s to wait for executors to come up.
+ int curExecutors = 0;
+ long ts = System.currentTimeMillis();
+ do {
+ curExecutors = getExecutorCount();
+ if (curExecutors >= minExecutors) {
+ LOG.info("Finished prewarming Spark executors. The current number of executors is " + curExecutors);
+ return;
+ }
+ Thread.sleep(1000); // sleep 1 second
+ } while (System.currentTimeMillis() - ts < 60000);
+
+ LOG.info("Timeout (60s) occurred while prewarming executors. The current number of executors is " + curExecutors);
+ }
+ }
+
+ /**
+ * Please note that the method is very tied with Spark documentation 1.4.1 regarding
+ * dynamic allocation, such as default values.
+ * @return
+ */
+ private int getExecutorsToWarm() {
+ int minExecutors =
+ HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.SPARK_PREWARM_NUM_CONTAINERS);
+ boolean dynamicAllocation = hiveConf.getBoolean("spark.dynamicAllocation.enabled", false);
+ if (dynamicAllocation) {
+ int min = sparkConf.getInt("spark.dynamicAllocation.minExecutors", 0);
+ int initExecutors = sparkConf.getInt("spark.dynamicAllocation.initialExecutors", min);
+ minExecutors = Math.min(minExecutors, initExecutors);
+ } else {
+ int execInstances = sparkConf.getInt("spark.executor.instances", 2);
+ minExecutors = Math.min(minExecutors, execInstances);
+ }
+ return minExecutors;
}
@Override