You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2017/01/11 09:02:24 UTC
hive git commit: HIVE-15299: Yarn-cluster and yarn-client deprecated
in Spark 2.0 (Rui reviewed by Xuefu)
Repository: hive
Updated Branches:
refs/heads/master c219ce581 -> e56b60f59
HIVE-15299: Yarn-cluster and yarn-client deprecated in Spark 2.0 (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e56b60f5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e56b60f5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e56b60f5
Branch: refs/heads/master
Commit: e56b60f59177b11342f82a746a65756df94d4561
Parents: c219ce5
Author: Rui Li <li...@apache.org>
Authored: Wed Jan 11 17:02:06 2017 +0800
Committer: Rui Li <li...@apache.org>
Committed: Wed Jan 11 17:02:06 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +-
.../apache/hadoop/hive/conf/TestHiveConf.java | 2 +-
data/conf/spark/yarn-client/hive-site.xml | 7 ++++-
.../ql/exec/spark/HiveSparkClientFactory.java | 31 ++++++++++++++++----
.../ql/exec/spark/RemoteHiveSparkClient.java | 8 +++--
.../hive/ql/exec/spark/SparkUtilities.java | 8 +++--
.../hive/spark/client/SparkClientImpl.java | 7 +++--
.../hive/spark/client/SparkClientUtilities.java | 29 ++++++++++++++++++
8 files changed, 79 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 16f6c1c..b97def4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3567,8 +3567,7 @@ public class HiveConf extends Configuration {
result = !name.equals("spark.app.name");
} else if (name.startsWith("yarn")) { // YARN property in Spark on YARN mode.
String sparkMaster = get("spark.master");
- if (sparkMaster != null &&
- (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) {
+ if (sparkMaster != null && sparkMaster.startsWith("yarn")) {
result = true;
}
} else if (name.startsWith("hive.spark")) { // Remote Spark Context property.
http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java b/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java
index f88573f..7f6175b 100644
--- a/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java
+++ b/common/src/test/org/apache/hadoop/hive/conf/TestHiveConf.java
@@ -150,7 +150,7 @@ public class TestHiveConf {
HiveConf conf = new HiveConf();
Assert.assertFalse(conf.getSparkConfigUpdated());
- conf.verifyAndSet("spark.master", "yarn-cluster");
+ conf.verifyAndSet("spark.master", "yarn");
Assert.assertTrue(conf.getSparkConfigUpdated());
conf.verifyAndSet("hive.execution.engine", "spark");
Assert.assertTrue("Expected spark config updated.", conf.getSparkConfigUpdated());
http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/data/conf/spark/yarn-client/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml
index 4e63245..9cda40d 100644
--- a/data/conf/spark/yarn-client/hive-site.xml
+++ b/data/conf/spark/yarn-client/hive-site.xml
@@ -201,7 +201,12 @@
<property>
<name>spark.master</name>
- <value>yarn-cluster</value>
+ <value>yarn</value>
+</property>
+
+<property>
+ <name>spark.submit.deployMode</name>
+ <value>cluster</value>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index d71a84c..1e97cd4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.common.LogUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.spark.client.SparkClientUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -52,11 +53,13 @@ public class HiveSparkClientFactory {
protected static final transient Logger LOG = LoggerFactory.getLogger(HiveSparkClientFactory.class);
private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
- private static final String SPARK_DEFAULT_MASTER = "yarn-cluster";
+ private static final String SPARK_DEFAULT_MASTER = "yarn";
+ private static final String SPARK_DEFAULT_DEPLOY_MODE = "cluster";
private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
private static final String SPARK_DEFAULT_REFERENCE_TRACKING = "false";
private static final String SPARK_WAIT_APP_COMPLETE = "spark.yarn.submit.waitAppCompletion";
+ private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf) throws Exception {
Map<String, String> sparkConf = initiateSparkConf(hiveconf);
@@ -125,10 +128,27 @@ public class HiveSparkClientFactory {
sparkMaster = sparkConf.get("spark.master");
hiveConf.set("spark.master", sparkMaster);
}
+ String deployMode = null;
+ if (!SparkClientUtilities.isLocalMaster(sparkMaster)) {
+ deployMode = hiveConf.get(SPARK_DEPLOY_MODE);
+ if (deployMode == null) {
+ deployMode = sparkConf.get(SPARK_DEPLOY_MODE);
+ if (deployMode == null) {
+ deployMode = SparkClientUtilities.getDeployModeFromMaster(sparkMaster);
+ }
+ if (deployMode == null) {
+ deployMode = SPARK_DEFAULT_DEPLOY_MODE;
+ }
+ hiveConf.set(SPARK_DEPLOY_MODE, deployMode);
+ }
+ }
if (SessionState.get() != null && SessionState.get().getConf() != null) {
SessionState.get().getConf().set("spark.master", sparkMaster);
+ if (deployMode != null) {
+ SessionState.get().getConf().set(SPARK_DEPLOY_MODE, deployMode);
+ }
}
- if (sparkMaster.equals("yarn-cluster")) {
+ if (SparkClientUtilities.isYarnClusterMode(sparkMaster, deployMode)) {
sparkConf.put("spark.yarn.maxAppAttempts", "1");
}
for (Map.Entry<String, String> entry : hiveConf) {
@@ -140,7 +160,7 @@ public class HiveSparkClientFactory {
"load spark property from hive configuration (%s -> %s).",
propertyName, LogUtils.maskIfPassword(propertyName,value)));
} else if (propertyName.startsWith("yarn") &&
- (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) {
+ SparkClientUtilities.isYarnMaster(sparkMaster)) {
String value = hiveConf.get(propertyName);
// Add spark.hadoop prefix for yarn properties as SparkConf only accept properties
// started with spark prefix, Spark would remove spark.hadoop prefix lately and add
@@ -184,7 +204,7 @@ public class HiveSparkClientFactory {
// set yarn queue name
final String sparkQueueNameKey = "spark.yarn.queue";
- if (sparkMaster.startsWith("yarn") && hiveConf.get(sparkQueueNameKey) == null) {
+ if (SparkClientUtilities.isYarnMaster(sparkMaster) && hiveConf.get(sparkQueueNameKey) == null) {
String queueName = hiveConf.get("mapreduce.job.queuename");
if (queueName != null) {
sparkConf.put(sparkQueueNameKey, queueName);
@@ -192,7 +212,8 @@ public class HiveSparkClientFactory {
}
// Disable it to avoid verbose app state report in yarn-cluster mode
- if (sparkMaster.equals("yarn-cluster") && sparkConf.get(SPARK_WAIT_APP_COMPLETE) == null) {
+ if (SparkClientUtilities.isYarnClusterMode(sparkMaster, deployMode) &&
+ sparkConf.get(SPARK_WAIT_APP_COMPLETE) == null) {
sparkConf.put(SPARK_WAIT_APP_COMPLETE, "false");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index a705dfc..6caf2b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -100,7 +100,7 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
remoteClient = SparkClientFactory.createClient(conf, hiveConf);
if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) &&
- hiveConf.get("spark.master").startsWith("yarn-")) {
+ SparkClientUtilities.isYarnMaster(hiveConf.get("spark.master"))) {
int minExecutors = getExecutorsToWarm();
if (minExecutors <= 0) {
return;
@@ -172,8 +172,10 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
}
@Override
- public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception {
- if (hiveConf.get("spark.master").startsWith("yarn-") && !remoteClient.isActive()) {
+ public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork)
+ throws Exception {
+ if (SparkClientUtilities.isYarnMaster(hiveConf.get("spark.master")) &&
+ !remoteClient.isActive()) {
// Re-create the remote client if not active any more
close();
createRemoteClient();
http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index 630b598..7d18c0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hive.spark.client.SparkClientUtilities;
import org.apache.spark.Dependency;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@@ -87,7 +88,10 @@ public class SparkUtilities {
// checks if a resource has to be uploaded to HDFS for yarn-cluster mode
public static boolean needUploadToHDFS(URI source, SparkConf sparkConf) {
- return sparkConf.get("spark.master").equals("yarn-cluster") &&
+ String master = sparkConf.get("spark.master");
+ String deployMode = sparkConf.contains("spark.submit.deployMode") ?
+ sparkConf.get("spark.submit.deployMode") : null;
+ return SparkClientUtilities.isYarnClusterMode(master, deployMode) &&
!source.getScheme().equals("hdfs");
}
@@ -102,7 +106,7 @@ public class SparkUtilities {
public static boolean isDedicatedCluster(Configuration conf) {
String master = conf.get("spark.master");
- return master.startsWith("yarn-") || master.startsWith("local");
+ return SparkClientUtilities.isYarnMaster(master) || SparkClientUtilities.isLocalMaster(master);
}
public static SparkSession getSparkSession(HiveConf conf,
http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 5191e1f..0da40dd 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -333,6 +333,7 @@ class SparkClientImpl implements SparkClient {
// SparkSubmit will take care of that for us.
String master = conf.get("spark.master");
Preconditions.checkArgument(master != null, "spark.master is not defined.");
+ String deployMode = conf.get("spark.submit.deployMode");
List<String> argv = Lists.newArrayList();
@@ -342,7 +343,9 @@ class SparkClientImpl implements SparkClient {
LOG.info("No spark.home provided, calling SparkSubmit directly.");
argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
- if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client") || master.startsWith("spark")) {
+ if (master.startsWith("local") || master.startsWith("mesos") ||
+ SparkClientUtilities.isYarnClientMode(master, deployMode) ||
+ master.startsWith("spark")) {
String mem = conf.get("spark.driver.memory");
if (mem != null) {
argv.add("-Xms" + mem);
@@ -383,7 +386,7 @@ class SparkClientImpl implements SparkClient {
argv.add(keyTabFile);
}
- if (master.equals("yarn-cluster")) {
+ if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) {
String executorCores = conf.get("spark.executor.cores");
if (executorCores != null) {
argv.add("--executor-cores");
http://git-wip-us.apache.org/repos/asf/hive/blob/e56b60f5/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
index 6251861..9ef3f38 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
@@ -106,4 +106,33 @@ public class SparkClientUtilities {
}
return url;
}
+
+ public static boolean isYarnClusterMode(String master, String deployMode) {
+ return "yarn-cluster".equals(master) ||
+ ("yarn".equals(master) && "cluster".equals(deployMode));
+ }
+
+ public static boolean isYarnClientMode(String master, String deployMode) {
+ return "yarn-client".equals(master) ||
+ ("yarn".equals(master) && "client".equals(deployMode));
+ }
+
+ public static boolean isYarnMaster(String master) {
+ return master != null && master.startsWith("yarn");
+ }
+
+ public static boolean isLocalMaster(String master) {
+ return master != null && master.startsWith("local");
+ }
+
+ public static String getDeployModeFromMaster(String master) {
+ if (master != null) {
+ if (master.equals("yarn-client")) {
+ return "client";
+ } else if (master.equals("yarn-cluster")) {
+ return "cluster";
+ }
+ }
+ return null;
+ }
}