You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/07/14 21:32:25 UTC

spark git commit: [SPARK-9825][YARN] Do not overwrite final Hadoop config entries.

Repository: spark
Updated Branches:
  refs/heads/master cb8d5cc90 -> 601a237b3


[SPARK-9825][YARN] Do not overwrite final Hadoop config entries.

When localizing the gateway config files in a YARN application, avoid
overwriting final configs by distributing the gateway files to a separate
directory, and explicitly loading them into the Hadoop config, instead
of placing those files before the cluster's files in the classpath.

This is done by saving the gateway's config to a separate XML file
distributed with the rest of the Spark app's config, and loading that
file when creating a new config through `YarnSparkHadoopUtil`.

Tested with existing unit tests, and by verifying the behavior in a YARN
cluster (final values are not overridden, non-final values are).

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #18370 from vanzin/SPARK-9825.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/601a237b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/601a237b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/601a237b

Branch: refs/heads/master
Commit: 601a237b3033340856a8715d84025ecd8eb8fdba
Parents: cb8d5cc
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Jul 14 14:32:19 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Fri Jul 14 14:32:19 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 59 ++++++++++++++------
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  7 ++-
 2 files changed, 48 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/601a237b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a5b0e19..3a7adb7 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -683,21 +683,6 @@ private[spark] class Client(
   private def createConfArchive(): File = {
     val hadoopConfFiles = new HashMap[String, File]()
 
-    // Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that
-    // the executors will use the latest configurations instead of the default values. This is
-    // required when user changes log4j.properties directly to set the log configurations. If
-    // configuration file is provided through --files then executors will be taking configurations
-    // from --files instead of $SPARK_CONF_DIR/log4j.properties.
-
-    // Also uploading metrics.properties to distributed cache if exists in classpath.
-    // If user specify this file using --files then executors will use the one
-    // from --files instead.
-    for { prop <- Seq("log4j.properties", "metrics.properties")
-          url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
-          if url.getProtocol == "file" } {
-      hadoopConfFiles(prop) = new File(url.getPath)
-    }
-
     Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
       sys.env.get(envKey).foreach { path =>
         val dir = new File(path)
@@ -722,14 +707,43 @@ private[spark] class Client(
 
     try {
       confStream.setLevel(0)
+
+      // Upload $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that
+      // the executors will use the latest configurations instead of the default values. This is
+      // required when user changes log4j.properties directly to set the log configurations. If
+      // configuration file is provided through --files then executors will be taking configurations
+      // from --files instead of $SPARK_CONF_DIR/log4j.properties.
+
+      // Also upload metrics.properties to distributed cache if exists in classpath.
+      // If user specify this file using --files then executors will use the one
+      // from --files instead.
+      for { prop <- Seq("log4j.properties", "metrics.properties")
+            url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
+            if url.getProtocol == "file" } {
+        val file = new File(url.getPath())
+        confStream.putNextEntry(new ZipEntry(file.getName()))
+        Files.copy(file, confStream)
+        confStream.closeEntry()
+      }
+
+      // Save the Hadoop config files under a separate directory in the archive. This directory
+      // is appended to the classpath so that the cluster-provided configuration takes precedence.
+      confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/"))
+      confStream.closeEntry()
       hadoopConfFiles.foreach { case (name, file) =>
         if (file.canRead()) {
-          confStream.putNextEntry(new ZipEntry(name))
+          confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/$name"))
           Files.copy(file, confStream)
           confStream.closeEntry()
         }
       }
 
+      // Save the YARN configuration into a separate file that will be overlayed on top of the
+      // cluster's Hadoop conf.
+      confStream.putNextEntry(new ZipEntry(SPARK_HADOOP_CONF_FILE))
+      yarnConf.writeXml(confStream)
+      confStream.closeEntry()
+
       // Save Spark configuration to a file in the archive.
       val props = new Properties()
       sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
@@ -1196,12 +1210,19 @@ private object Client extends Logging {
   // Subdirectory where the user's Spark and Hadoop config files will be placed.
   val LOCALIZED_CONF_DIR = "__spark_conf__"
 
+  // Subdirectory in the conf directory containing Hadoop config files.
+  val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"
+
   // File containing the conf archive in the AM. See prepareLocalResources().
   val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip"
 
   // Name of the file in the conf archive containing Spark configuration.
   val SPARK_CONF_FILE = "__spark_conf__.properties"
 
+  // Name of the file containing the gateway's Hadoop configuration, to be overlayed on top of the
+  // cluster's Hadoop config.
+  val SPARK_HADOOP_CONF_FILE = "__spark_hadoop_conf__.xml"
+
   // Subdirectory where the user's python files (not archives) will be placed.
   val LOCALIZED_PYTHON_DIR = "__pyfiles__"
 
@@ -1307,6 +1328,12 @@ private object Client extends Logging {
     sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
       addClasspathEntry(getClusterPath(sparkConf, cp), env)
     }
+
+    // Add the localized Hadoop config at the end of the classpath, in case it contains other
+    // files (such as configuration files for different services) that are not part of the
+    // YARN cluster's config.
+    addClasspathEntry(
+      buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, LOCALIZED_HADOOP_CONF_DIR), env)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/601a237b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4522071..a687f67 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -61,8 +61,11 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
 
   // Return an appropriate (subclass) of Configuration. Creating a config initializes some Hadoop
   // subsystems. Always create a new config, don't reuse yarnConf.
-  override def newConfiguration(conf: SparkConf): Configuration =
-    new YarnConfiguration(super.newConfiguration(conf))
+  override def newConfiguration(conf: SparkConf): Configuration = {
+    val hadoopConf = new YarnConfiguration(super.newConfiguration(conf))
+    hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE)
+    hadoopConf
+  }
 
   // Add any user credentials to the job conf which are necessary for running on a secure Hadoop
   // cluster


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org