You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/08/06 02:50:45 UTC

[39/53] [abbrv] hive git commit: HIVE-11434: Followup for HIVE-10166: reuse existing configurations for prewarming Spark executors (reviewed by Chao)

HIVE-11434: Followup for HIVE-10166: reuse existing configurations for prewarming Spark executors (reviewed by Chao)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5bb2506f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5bb2506f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5bb2506f

Branch: refs/heads/llap
Commit: 5bb2506f8e8e448748379682bc8d8f986244cc6b
Parents: cb64890
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Mon Aug 3 19:14:20 2015 -0700
Committer: Xuefu Zhang <xz...@Cloudera.com>
Committed: Mon Aug 3 19:14:20 2015 -0700

----------------------------------------------------------------------
 .../src/java/org/apache/hadoop/hive/conf/HiveConf.java |  9 ++-------
 .../hive/ql/exec/spark/RemoteHiveSparkClient.java      | 13 ++++++-------
 .../org/apache/hive/spark/client/SparkClientImpl.java  |  1 -
 3 files changed, 8 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5bb2506f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f593d7d..fe4eb9f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2109,8 +2109,8 @@ public class HiveConf extends Configuration {
     HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true,
         "Whether to generate the splits locally or in the AM (tez only)"),
 
-    HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez (Hadoop 2 only)"),
-    HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez (Hadoop 2 only)"),
+    HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false, "Enables container prewarm for Tez/Spark (Hadoop 2 only)"),
+    HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10, "Controls the number of containers to prewarm for Tez/Spark (Hadoop 2 only)"),
 
     HIVESTAGEIDREARRANGE("hive.stageid.rearrange", "none", new StringSet("none", "idonly", "traverse", "execution"), ""),
     HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES("hive.explain.dependency.append.tasktype", false, ""),
@@ -2220,11 +2220,6 @@ public class HiveConf extends Configuration {
     SPARK_DYNAMIC_PARTITION_PRUNING_MAX_DATA_SIZE(
         "hive.spark.dynamic.partition.pruning.max.data.size", 100*1024*1024L,
         "Maximum total data size in dynamic pruning."),
-    SPARK_PREWARM_CONTAINERS("hive.spark.prewarm.containers", false, "Whether to prewarn containers for Spark." +
-      "If enabled, Hive will spend no more than 60 seconds to wait for the containers to come up " +
-      "before any query can be executed."),
-    SPARK_PREWARM_NUM_CONTAINERS("hive.spark.prewarm.num.containers", 10, "The minimum number of containers to be prewarmed for Spark." +
-      "Applicable only if hive.spark.prewarm.containers is set to true."),
     NWAYJOINREORDER("hive.reorder.nway.joins", true,
       "Runs reordering of tables within single n-way join (i.e.: picks streamtable)"),
     HIVE_LOG_N_RECORDS("hive.log.every.n.records", 0L, new RangeValidator(0L, null),

http://git-wip-us.apache.org/repos/asf/hive/blob/5bb2506f/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 92167e4..7d43160 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -71,9 +71,8 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
   private static final long serialVersionUID = 1L;
 
   private static final String MR_JAR_PROPERTY = "tmpjars";
-  protected static final transient Log LOG = LogFactory
-    .getLog(RemoteHiveSparkClient.class);
-
+  private static final transient Log LOG = LogFactory.getLog(RemoteHiveSparkClient.class);
+  private static final long MAX_PREWARM_TIME = 30000; // 30s
   private static final transient Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
 
   private transient SparkClient remoteClient;
@@ -92,7 +91,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
     sparkConf = HiveSparkClientFactory.generateSparkConf(conf);
     remoteClient = SparkClientFactory.createClient(conf, hiveConf);
 
-    if (HiveConf.getBoolVar(hiveConf, ConfVars.SPARK_PREWARM_CONTAINERS) &&
+    if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) &&
         hiveConf.get("spark.master").startsWith("yarn-")) {
       int minExecutors = getExecutorsToWarm();
       if (minExecutors <= 0) {
@@ -101,7 +100,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
 
       LOG.info("Prewarm Spark executors. The minimum number of executors to warm is " + minExecutors);
 
-      // Spend at most 60s to wait for executors to come up.
+      // Spend at most MAX_PREWARM_TIME to wait for executors to come up.
       int curExecutors = 0;
       long ts = System.currentTimeMillis();
       do {
@@ -111,7 +110,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
           return;
         }
         Thread.sleep(1000); // sleep 1 second
-      } while (System.currentTimeMillis() - ts < 60000);
+      } while (System.currentTimeMillis() - ts < MAX_PREWARM_TIME);
 
       LOG.info("Timeout (60s) occurred while prewarming executors. The current number of executors is " + curExecutors);
     }
@@ -124,7 +123,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
    */
   private int getExecutorsToWarm() {
     int minExecutors =
-        HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.SPARK_PREWARM_NUM_CONTAINERS);
+        HiveConf.getIntVar(hiveConf, HiveConf.ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
     boolean dynamicAllocation = hiveConf.getBoolean("spark.dynamicAllocation.enabled", false);
     if (dynamicAllocation) {
       int min = sparkConf.getInt("spark.dynamicAllocation.minExecutors", 0);

http://git-wip-us.apache.org/repos/asf/hive/blob/5bb2506f/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 60baa31..e1e64a7 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hive.spark.client.rpc.Rpc;
 import org.apache.hive.spark.client.rpc.RpcConfiguration;
 import org.apache.hive.spark.client.rpc.RpcServer;