You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/26 20:05:57 UTC
svn commit: r1654873 - in /hive/branches/spark:
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
Author: xuefu
Date: Mon Jan 26 19:05:57 2015
New Revision: 1654873
URL: http://svn.apache.org/r1654873
Log:
HIVE-9449: Push YARN configuration to Spark while deply Spark on YARN[Spark Branch] (Chengxiang via Xuefu)
Modified:
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1654873&r1=1654872&r2=1654873&view=diff
==============================================================================
--- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Jan 26 19:05:57 2015
@@ -2258,10 +2258,33 @@ public class HiveConf extends Configurat
throw new IllegalArgumentException("Cannot modify " + name + " at runtime. It is in the list"
+ "of parameters that can't be modified at runtime");
}
- isSparkConfigUpdated = name.startsWith("spark");
+ isSparkConfigUpdated = isSparkRelatedConfig(name);
set(name, value);
}
+ /**
+ * check whether spark related property is updated, which includes spark configurations,
+ * RSC configurations and yarn configuration in Spark on YARN mode.
+ * @param name
+ * @return
+ */
+ private boolean isSparkRelatedConfig(String name) {
+ boolean result = false;
+ if (name.startsWith("spark")) { // Spark property.
+ result = true;
+ } else if (name.startsWith("yarn")) { // YARN property in Spark on YARN mode.
+ String sparkMaster = get("spark.master");
+ if (sparkMaster != null &&
+ (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) {
+ result = true;
+ }
+ } else if (name.startsWith("hive.spark")) { // Remote Spark Context property.
+ result = true;
+ }
+
+ return result;
+ }
+
public static int getIntVar(Configuration conf, ConfVars var) {
assert (var.valClass == Integer.class) : var.varname;
return conf.getInt(var.varname, var.defaultIntVal);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1654873&r1=1654872&r2=1654873&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Mon Jan 26 19:05:57 2015
@@ -42,6 +42,7 @@ public class HiveSparkClientFactory {
private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
private static final String SPARK_DEFAULT_MASTER = "local";
private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
+ private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf)
throws IOException, SparkException {
@@ -64,8 +65,7 @@ public class HiveSparkClientFactory {
// set default spark configurations.
sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME);
- sparkConf.put("spark.serializer",
- "org.apache.spark.serializer.KryoSerializer");
+ sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER);
// load properties from spark-defaults.conf.
InputStream inputStream = null;
@@ -81,7 +81,7 @@ public class HiveSparkClientFactory {
String value = properties.getProperty(propertyName);
sparkConf.put(propertyName, properties.getProperty(propertyName));
LOG.info(String.format(
- "load spark configuration from %s (%s -> %s).",
+ "load spark property from %s (%s -> %s).",
SPARK_DEFAULT_CONF_FILE, propertyName, value));
}
}
@@ -99,22 +99,36 @@ public class HiveSparkClientFactory {
}
}
- // load properties from hive configurations, including both spark.* properties
- // and properties for remote driver RPC.
+ // load properties from hive configurations, including both spark.* properties,
+ // properties for remote driver RPC, and yarn properties for Spark on YARN mode.
+ String sparkMaster = hiveConf.get("spark.master");
+ if (sparkMaster == null) {
+ sparkMaster = sparkConf.get("spark.master");
+ }
for (Map.Entry<String, String> entry : hiveConf) {
String propertyName = entry.getKey();
if (propertyName.startsWith("spark")) {
String value = hiveConf.get(propertyName);
sparkConf.put(propertyName, value);
LOG.info(String.format(
- "load spark configuration from hive configuration (%s -> %s).",
+ "load spark property from hive configuration (%s -> %s).",
propertyName, value));
+ } else if (propertyName.startsWith("yarn") &&
+ (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) {
+ String value = hiveConf.get(propertyName);
+ // Add spark.hadoop prefix for yarn properties as SparkConf only accept properties
+ // started with spark prefix, Spark would remove spark.hadoop prefix lately and add
+ // it to its hadoop configuration.
+ sparkConf.put("spark.hadoop." + propertyName, value);
+ LOG.info(String.format(
+ "load yarn property from hive configuration in %s mode (%s -> %s).",
+ sparkMaster, propertyName, value));
}
if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
String value = RpcConfiguration.getValue(hiveConf, propertyName);
sparkConf.put(propertyName, value);
LOG.info(String.format(
- "load RPC configuration from hive configuration (%s -> %s).",
+ "load RPC property from hive configuration (%s -> %s).",
propertyName, value));
}
}