You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/03/09 18:36:44 UTC

spark git commit: [SPARK-23630][YARN] Allow user's hadoop conf customizations to take effect.

Repository: spark
Updated Branches:
  refs/heads/master d90e77bd0 -> 2c3673680


[SPARK-23630][YARN] Allow user's hadoop conf customizations to take effect.

This change restores functionality that was inadvertently removed as part
of the fix for SPARK-22372.

Also modified an existing unit test to make sure the feature works as intended.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #20776 from vanzin/SPARK-23630.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c367368
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c367368
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c367368

Branch: refs/heads/master
Commit: 2c3673680e16f88f1d1cd73a3f7445ded5b3daa8
Parents: d90e77b
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Mar 9 10:36:38 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Fri Mar 9 10:36:38 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/deploy/SparkHadoopUtil.scala   | 11 ++++++-
 .../org/apache/spark/deploy/yarn/Client.scala   | 14 ++++----
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 34 +++++++++++++++-----
 3 files changed, 44 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c367368/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index e14f984..177295f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -111,7 +111,9 @@ class SparkHadoopUtil extends Logging {
    * subsystems.
    */
   def newConfiguration(conf: SparkConf): Configuration = {
-    SparkHadoopUtil.newConfiguration(conf)
+    val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+    hadoopConf.addResource(SparkHadoopUtil.SPARK_HADOOP_CONF_FILE)
+    hadoopConf
   }
 
   /**
@@ -435,6 +437,13 @@ object SparkHadoopUtil {
    */
   private[spark] val UPDATE_INPUT_METRICS_INTERVAL_RECORDS = 1000
 
+  /**
+   * Name of the file containing the gateway's Hadoop configuration, to be overlayed on top of the
+   * cluster's Hadoop config. It is up to the Spark code launching the application to create
+   * this file if it's desired. If the file doesn't exist, it will just be ignored.
+   */
+  private[spark] val SPARK_HADOOP_CONF_FILE = "__spark_hadoop_conf__.xml"
+
   def get: SparkHadoopUtil = instance
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2c367368/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8cd3cd9..28087de 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -696,7 +696,13 @@ private[spark] class Client(
       }
     }
 
-    Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
+    // SPARK-23630: during testing, Spark scripts filter out hadoop conf dirs so that user's
+    // environments do not interfere with tests. This allows a special env variable during
+    // tests so that custom conf dirs can be used by unit tests.
+    val confDirs = Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR") ++
+      (if (Utils.isTesting) Seq("SPARK_TEST_HADOOP_CONF_DIR") else Nil)
+
+    confDirs.foreach { envKey =>
       sys.env.get(envKey).foreach { path =>
         val dir = new File(path)
         if (dir.isDirectory()) {
@@ -753,7 +759,7 @@ private[spark] class Client(
 
       // Save the YARN configuration into a separate file that will be overlayed on top of the
       // cluster's Hadoop conf.
-      confStream.putNextEntry(new ZipEntry(SPARK_HADOOP_CONF_FILE))
+      confStream.putNextEntry(new ZipEntry(SparkHadoopUtil.SPARK_HADOOP_CONF_FILE))
       hadoopConf.writeXml(confStream)
       confStream.closeEntry()
 
@@ -1220,10 +1226,6 @@ private object Client extends Logging {
   // Name of the file in the conf archive containing Spark configuration.
   val SPARK_CONF_FILE = "__spark_conf__.properties"
 
-  // Name of the file containing the gateway's Hadoop configuration, to be overlayed on top of the
-  // cluster's Hadoop config.
-  val SPARK_HADOOP_CONF_FILE = "__spark_hadoop_conf__.xml"
-
   // Subdirectory where the user's python files (not archives) will be placed.
   val LOCALIZED_PYTHON_DIR = "__pyfiles__"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2c367368/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 5003326..33d400a 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -114,12 +114,25 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
       ))
   }
 
-  test("yarn-cluster should respect conf overrides in SparkHadoopUtil (SPARK-16414)") {
+  test("yarn-cluster should respect conf overrides in SparkHadoopUtil (SPARK-16414, SPARK-23630)") {
+    // Create a custom hadoop config file, to make sure it's contents are propagated to the driver.
+    val customConf = Utils.createTempDir()
+    val coreSite = """<?xml version="1.0" encoding="UTF-8"?>
+      |<configuration>
+      |  <property>
+      |    <name>spark.test.key</name>
+      |    <value>testvalue</value>
+      |  </property>
+      |</configuration>
+      |""".stripMargin
+    Files.write(coreSite, new File(customConf, "core-site.xml"), StandardCharsets.UTF_8)
+
     val result = File.createTempFile("result", null, tempDir)
     val finalState = runSpark(false,
       mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass),
-      appArgs = Seq("key=value", result.getAbsolutePath()),
-      extraConf = Map("spark.hadoop.key" -> "value"))
+      appArgs = Seq("key=value", "spark.test.key=testvalue", result.getAbsolutePath()),
+      extraConf = Map("spark.hadoop.key" -> "value"),
+      extraEnv = Map("SPARK_TEST_HADOOP_CONF_DIR" -> customConf.getAbsolutePath()))
     checkResult(finalState, result)
   }
 
@@ -319,13 +332,13 @@ private object YarnClusterDriverWithFailure extends Logging with Matchers {
 
 private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matchers {
   def main(args: Array[String]): Unit = {
-    if (args.length != 2) {
+    if (args.length < 2) {
       // scalastyle:off println
       System.err.println(
         s"""
         |Invalid command line: ${args.mkString(" ")}
         |
-        |Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value] [result file]
+        |Usage: YarnClusterDriverUseSparkHadoopUtilConf [hadoopConfKey=value]+ [result file]
         """.stripMargin)
       // scalastyle:on println
       System.exit(1)
@@ -335,11 +348,16 @@ private object YarnClusterDriverUseSparkHadoopUtilConf extends Logging with Matc
       .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
       .setAppName("yarn test using SparkHadoopUtil's conf"))
 
-    val kv = args(0).split("=")
-    val status = new File(args(1))
+    val kvs = args.take(args.length - 1).map { kv =>
+      val parsed = kv.split("=")
+      (parsed(0), parsed(1))
+    }
+    val status = new File(args.last)
     var result = "failure"
     try {
-      SparkHadoopUtil.get.conf.get(kv(0)) should be (kv(1))
+      kvs.foreach { case (k, v) =>
+        SparkHadoopUtil.get.conf.get(k) should be (v)
+      }
       result = "success"
     } finally {
       Files.write(result, status, StandardCharsets.UTF_8)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org