You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2017/01/11 09:02:24 UTC

hive git commit: HIVE-15299: Yarn-cluster and yarn-client deprecated in Spark 2.0 (Rui reviewed by Xuefu)

Repository: hive
Updated Branches:
  refs/heads/master c219ce581 -> e56b60f59


HIVE-15299: Yarn-cluster and yarn-client deprecated in Spark 2.0 (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e56b60f5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e56b60f5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e56b60f5

Branch: refs/heads/master
Commit: e56b60f59177b11342f82a746a65756df94d4561
Parents: c219ce5
Author: Rui Li <li...@apache.org>
Authored: Wed Jan 11 17:02:06 2017 +0800
Committer: Rui Li <li...@apache.org>
Committed: Wed Jan 11 17:02:06 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  3 +-
 .../apache/hadoop/hive/conf/TestHiveConf.java   |  2 +-
 data/conf/spark/yarn-client/hive-site.xml       |  7 ++++-
 .../ql/exec/spark/HiveSparkClientFactory.java   | 31 ++++++++++++++++----
 .../ql/exec/spark/RemoteHiveSparkClient.java    |  8 +++--
 .../hive/ql/exec/spark/SparkUtilities.java      |  8 +++--
 .../hive/spark/client/SparkClientImpl.java      |  7 +++--
 .../hive/spark/client/SparkClientUtilities.java | 29 ++++++++++++++++++
 8 files changed, 79 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 16f6c1c..b97def4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3567,8 +3567,7 @@ public class HiveConf extends Configuration {
       result = !name.equals("spark.app.name");
     } else if (name.startsWith("yarn")) { // YARN property in Spark on YARN mode.
       String sparkMaster = get("spark.master");
-      if (sparkMaster != null &&
-        (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) {
+      if (sparkMaster != null && sparkMaster.startsWith("yarn")) {
         result = true;
       }
     } else if (name.startsWith("hive.spark")) { // Remote Spark Context property.

http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java b/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java
index f88573f..7f6175b 100644
--- a/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java
+++ b/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java
@@ -150,7 +150,7 @@ public class TestHiveConf {
     HiveConf conf = new HiveConf();
     Assert.assertFalse(conf.getSparkConfigUpdated());
 
-    conf.verifyAndSet("spark.master", "yarn-cluster");
+    conf.verifyAndSet("spark.master", "yarn");
     Assert.assertTrue(conf.getSparkConfigUpdated());
     conf.verifyAndSet("hive.execution.engine", "spark");
     Assert.assertTrue("Expected spark config updated.", conf.getSparkConfigUpdated());

http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/data/conf/spark/yarn-client/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml
index 4e63245..9cda40d 100644
--- a/data/conf/spark/yarn-client/hive-site.xml
+++ b/data/conf/spark/yarn-client/hive-site.xml
@@ -201,7 +201,12 @@
 
 <property>
   <name>spark.master</name>
-  <value>yarn-cluster</value>
+  <value>yarn</value>
+</property>
+
+<property>
+  <name>spark.submit.deployMode</name>
+  <value>cluster</value>
 </property>
 
 <property>

http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index d71a84c..1e97cd4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.spark.client.SparkClientUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -52,11 +53,13 @@ public class HiveSparkClientFactory {
   protected static final transient Logger LOG = LoggerFactory.getLogger(HiveSparkClientFactory.class);
 
   private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
-  private static final String SPARK_DEFAULT_MASTER = "yarn-cluster";
+  private static final String SPARK_DEFAULT_MASTER = "yarn";
+  private static final String SPARK_DEFAULT_DEPLOY_MODE = "cluster";
   private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
   private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
   private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false";
   private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion";
+  private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
 
   public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception {
     Map<String, String> sparkConf = initiateSparkConf(hiveconf);
@@ -125,10 +128,27 @@ public class HiveSparkClientFactory {
       sparkMaster = sparkConf.get("spark.master");
       hiveConf.set("spark.master", sparkMaster);
     }
+    String deployMode = null;
+    if (!SparkClientUtilities.isLocalMaster(sparkMaster)) {
+      deployMode = hiveConf.get(SPARK_DEPLOY_MODE);
+      if (deployMode == null) {
+        deployMode = sparkConf.get(SPARK_DEPLOY_MODE);
+        if (deployMode == null) {
+          deployMode = SparkClientUtilities.getDeployModeFromMaster(sparkMaster);
+        }
+        if (deployMode == null) {
+          deployMode = SPARK_DEFAULT_DEPLOY_MODE;
+        }
+        hiveConf.set(SPARK_DEPLOY_MODE, deployMode);
+      }
+    }
     if (SessionState.get() != null && SessionState.get().getConf() != null) {
       SessionState.get().getConf().set("spark.master", sparkMaster);
+      if (deployMode != null) {
+        SessionState.get().getConf().set(SPARK_DEPLOY_MODE, deployMode);
+      }
     }
-    if (sparkMaster.equals("yarn-cluster")) {
+    if (SparkClientUtilities.isYarnClusterMode(sparkMaster, deployMode)) {
       sparkConf.put("spark.yarn.maxAppAttempts", "1");
     }
     for (Map.Entry<String, String> entry : hiveConf) {
@@ -140,7 +160,7 @@ public class HiveSparkClientFactory {
           "load spark property from hive configuration (%s -> %s).",
           propertyName, LogUtils.maskIfPassword(propertyName,value)));
       } else if (propertyName.startsWith("yarn") &&
-        (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) {
+          SparkClientUtilities.isYarnMaster(sparkMaster)) {
         String value = hiveConf.get(propertyName);
         // Add spark.hadoop prefix for yarn properties as SparkConf only accept properties
         // started with spark prefix, Spark would remove spark.hadoop prefix lately and add
@@ -184,7 +204,7 @@ public class HiveSparkClientFactory {
 
     // set yarn queue name
     final String sparkQueueNameKey = "spark.yarn.queue";
-    if (sparkMaster.startsWith("yarn") && hiveConf.get(sparkQueueNameKey) == null) {
+    if (SparkClientUtilities.isYarnMaster(sparkMaster) && hiveConf.get(sparkQueueNameKey) == null) {
       String queueName = hiveConf.get("mapreduce.job.queuename");
       if (queueName != null) {
         sparkConf.put(sparkQueueNameKey, queueName);
@@ -192,7 +212,8 @@ public class HiveSparkClientFactory {
     }
 
     // Disable it to avoid verbose app state report in yarn-cluster mode
-    if (sparkMaster.equals("yarn-cluster") && sparkConf.get(SPARK_WAIT_APP_COMPLETE) == null) {
+    if (SparkClientUtilities.isYarnClusterMode(sparkMaster, deployMode) &&
+        sparkConf.get(SPARK_WAIT_APP_COMPLETE) == null) {
       sparkConf.put(SPARK_WAIT_APP_COMPLETE, "false");
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index a705dfc..6caf2b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -100,7 +100,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
     remoteClient = SparkClientFactory.createClient(conf, hiveConf);
 
     if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) &&
-        hiveConf.get("spark.master").startsWith("yarn-")) {
+        SparkClientUtilities.isYarnMaster(hiveConf.get("spark.master"))) {
       int minExecutors = getExecutorsToWarm();
       if (minExecutors <= 0) {
         return;
@@ -172,8 +172,10 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
   }
 
   @Override
-  public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception {
-    if (hiveConf.get("spark.master").startsWith("yarn-") && !remoteClient.isActive()) {
+  public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork)
+      throws Exception {
+    if (SparkClientUtilities.isYarnMaster(hiveConf.get("spark.master")) &&
+        !remoteClient.isActive()) {
       // Re-create the remote client if not active any more
       close();
       createRemoteClient();

http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index 630b598..7d18c0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.spark.client.SparkClientUtilities;
 import org.apache.spark.Dependency;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
@@ -87,7 +88,10 @@ public class SparkUtilities {
 
   // checks if a resource has to be uploaded to HDFS for yarn-cluster mode
   public static boolean needUploadToHDFS(URI source, SparkConf sparkConf) {
-    return sparkConf.get("spark.master").equals("yarn-cluster") &&
+    String master = sparkConf.get("spark.master");
+    String deployMode = sparkConf.contains("spark.submit.deployMode") ?
+        sparkConf.get("spark.submit.deployMode") : null;
+    return SparkClientUtilities.isYarnClusterMode(master, deployMode) &&
         !source.getScheme().equals("hdfs");
   }
 
@@ -102,7 +106,7 @@ public class SparkUtilities {
 
   public static boolean isDedicatedCluster(Configuration conf) {
     String master = conf.get("spark.master");
-    return master.startsWith("yarn-") || master.startsWith("local");
+    return SparkClientUtilities.isYarnMaster(master) || SparkClientUtilities.isLocalMaster(master);
   }
 
   public static SparkSession getSparkSession(HiveConf conf,

http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 5191e1f..0da40dd 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -333,6 +333,7 @@ class SparkClientImpl implements SparkClient {
       // SparkSubmit will take care of that for us.
       String master = conf.get("spark.master");
       Preconditions.checkArgument(master != null, "spark.master is not defined.");
+      String deployMode = conf.get("spark.submit.deployMode");
 
       List<String> argv = Lists.newArrayList();
 
@@ -342,7 +343,9 @@ class SparkClientImpl implements SparkClient {
         LOG.info("No spark.home provided, calling SparkSubmit directly.");
         argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
 
-        if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client") || master.startsWith("spark")) {
+        if (master.startsWith("local") || master.startsWith("mesos") ||
+            SparkClientUtilities.isYarnClientMode(master, deployMode) ||
+            master.startsWith("spark")) {
           String mem = conf.get("spark.driver.memory");
           if (mem != null) {
             argv.add("-Xms" + mem);
@@ -383,7 +386,7 @@ class SparkClientImpl implements SparkClient {
           argv.add(keyTabFile);
       }
 
-      if (master.equals("yarn-cluster")) {
+      if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) {
         String executorCores = conf.get("spark.executor.cores");
         if (executorCores != null) {
           argv.add("--executor-cores");

http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
index 6251861..9ef3f38 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
@@ -106,4 +106,33 @@ public class SparkClientUtilities {
     }
     return url;
   }
+
+  public static boolean isYarnClusterMode(String master, String deployMode) {
+    return "yarn-cluster".equals(master) ||
+        ("yarn".equals(master) && "cluster".equals(deployMode));
+  }
+
+  public static boolean isYarnClientMode(String master, String deployMode) {
+    return "yarn-client".equals(master) ||
+        ("yarn".equals(master) && "client".equals(deployMode));
+  }
+
+  public static boolean isYarnMaster(String master) {
+    return master != null && master.startsWith("yarn");
+  }
+
+  public static boolean isLocalMaster(String master) {
+    return master != null && master.startsWith("local");
+  }
+
+  public static String getDeployModeFromMaster(String master) {
+    if (master != null) {
+      if (master.equals("yarn-client")) {
+        return "client";
+      } else if (master.equals("yarn-cluster")) {
+        return "cluster";
+      }
+    }
+    return null;
+  }
 }