You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/01/26 20:05:57 UTC

svn commit: r1654873 - in /hive/branches/spark: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java

Author: xuefu
Date: Mon Jan 26 19:05:57 2015
New Revision: 1654873

URL: http://svn.apache.org/r1654873
Log:
HIVE-9449: Push YARN configuration to Spark while deply Spark on YARN[Spark Branch] (Chengxiang via Xuefu)

Modified:
    hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java

Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1654873&r1=1654872&r2=1654873&view=diff
==============================================================================
--- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Jan 26 19:05:57 2015
@@ -2258,10 +2258,33 @@ public class HiveConf extends Configurat
       throw new IllegalArgumentException("Cannot modify " + name + " at runtime. It is in the list"
           + "of parameters that can't be modified at runtime");
     }
-    isSparkConfigUpdated = name.startsWith("spark");
+    isSparkConfigUpdated = isSparkRelatedConfig(name);
     set(name, value);
   }
 
+  /**
+   * check whether spark related property is updated, which includes spark configurations,
+   * RSC configurations and yarn configuration in Spark on YARN mode.
+   * @param name
+   * @return
+   */
+  private boolean isSparkRelatedConfig(String name) {
+    boolean result = false;
+    if (name.startsWith("spark")) { // Spark property.
+      result = true;
+    } else if (name.startsWith("yarn")) { // YARN property in Spark on YARN mode.
+      String sparkMaster = get("spark.master");
+      if (sparkMaster != null &&
+        (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) {
+        result = true;
+      }
+    } else if (name.startsWith("hive.spark")) { // Remote Spark Context property.
+      result = true;
+    }
+
+    return result;
+  }
+
   public static int getIntVar(Configuration conf, ConfVars var) {
     assert (var.valClass == Integer.class) : var.varname;
     return conf.getInt(var.varname, var.defaultIntVal);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1654873&r1=1654872&r2=1654873&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Mon Jan 26 19:05:57 2015
@@ -42,6 +42,7 @@ public class HiveSparkClientFactory {
   private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
   private static final String SPARK_DEFAULT_MASTER = "local";
   private static final String SPARK_DEFAULT_APP_NAME = "Hive on Spark";
+  private static final String SPARK_DEFAULT_SERIALIZER = "org.apache.spark.serializer.KryoSerializer";
 
   public static HiveSparkClient createHiveSparkClient(HiveConf hiveconf)
     throws IOException, SparkException {
@@ -64,8 +65,7 @@ public class HiveSparkClientFactory {
     // set default spark configurations.
     sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
     sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME);
-    sparkConf.put("spark.serializer",
-      "org.apache.spark.serializer.KryoSerializer");
+    sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER);
 
     // load properties from spark-defaults.conf.
     InputStream inputStream = null;
@@ -81,7 +81,7 @@ public class HiveSparkClientFactory {
             String value = properties.getProperty(propertyName);
             sparkConf.put(propertyName, properties.getProperty(propertyName));
             LOG.info(String.format(
-              "load spark configuration from %s (%s -> %s).",
+              "load spark property from %s (%s -> %s).",
               SPARK_DEFAULT_CONF_FILE, propertyName, value));
           }
         }
@@ -99,22 +99,36 @@ public class HiveSparkClientFactory {
       }
     }
 
-    // load properties from hive configurations, including both spark.* properties
-    // and properties for remote driver RPC.
+    // load properties from hive configurations, including both spark.* properties,
+    // properties for remote driver RPC, and yarn properties for Spark on YARN mode.
+    String sparkMaster = hiveConf.get("spark.master");
+    if (sparkMaster == null) {
+      sparkMaster = sparkConf.get("spark.master");
+    }
     for (Map.Entry<String, String> entry : hiveConf) {
       String propertyName = entry.getKey();
       if (propertyName.startsWith("spark")) {
         String value = hiveConf.get(propertyName);
         sparkConf.put(propertyName, value);
         LOG.info(String.format(
-          "load spark configuration from hive configuration (%s -> %s).",
+          "load spark property from hive configuration (%s -> %s).",
           propertyName, value));
+      } else if (propertyName.startsWith("yarn") &&
+        (sparkMaster.equals("yarn-client") || sparkMaster.equals("yarn-cluster"))) {
+        String value = hiveConf.get(propertyName);
+        // Add spark.hadoop prefix for yarn properties as SparkConf only accept properties
+        // started with spark prefix, Spark would remove spark.hadoop prefix lately and add
+        // it to its hadoop configuration.
+        sparkConf.put("spark.hadoop." + propertyName, value);
+        LOG.info(String.format(
+          "load yarn property from hive configuration in %s mode (%s -> %s).",
+          sparkMaster, propertyName, value));
       }
       if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
         String value = RpcConfiguration.getValue(hiveConf, propertyName);
         sparkConf.put(propertyName, value);
         LOG.info(String.format(
-          "load RPC configuration from hive configuration (%s -> %s).",
+          "load RPC property from hive configuration (%s -> %s).",
           propertyName, value));
       }
     }