You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2015/04/17 21:22:21 UTC

spark git commit: [SPARK-2669] [yarn] Distribute client configuration to AM.

Repository: spark
Updated Branches:
  refs/heads/master c84d91692 -> 50ab8a654


[SPARK-2669] [yarn] Distribute client configuration to AM.

Currently, when Spark launches the Yarn AM, the process will use
the local Hadoop configuration on the node where the AM launches,
if one is present. A more correct approach is to use the same
configuration used to launch the Spark job, since the user may
have made modifications (such as adding app-specific configs).

The approach taken here is to use the distributed cache to make
all files in the Hadoop configuration directory available to the
AM. This is a little overkill since only the AM needs them (the
executors use the broadcast Hadoop configuration from the driver),
but is the easier approach.

Even though only a few files in that directory may end up being
used, all of them are uploaded. This allows supporting use cases
such as when auxiliary configuration files are used for SSL
configuration, or when uploading a Hive configuration directory.
Not all of these may be reflected in a o.a.h.conf.Configuration object,
but may be needed when a driver in cluster mode instantiates, for
example, a HiveConf object instead.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #4142 from vanzin/SPARK-2669 and squashes the following commits:

f5434b9 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669
013f0fb [Marcelo Vanzin] Review feedback.
f693152 [Marcelo Vanzin] Le sigh.
ed45b7d [Marcelo Vanzin] Zip all config files and upload them as an archive.
5927b6b [Marcelo Vanzin] Merge branch 'master' into SPARK-2669
cbb9fb3 [Marcelo Vanzin] Remove stale test.
e3e58d0 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669
e3d0613 [Marcelo Vanzin] Review feedback.
34bdbd8 [Marcelo Vanzin] Fix test.
022a688 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669
a77ddd5 [Marcelo Vanzin] Merge branch 'master' into SPARK-2669
79221c7 [Marcelo Vanzin] [SPARK-2669] [yarn] Distribute client configuration to AM.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50ab8a65
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50ab8a65
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50ab8a65

Branch: refs/heads/master
Commit: 50ab8a6543ad5c31e89c16df374d0cb13222fd1e
Parents: c84d916
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Apr 17 14:21:51 2015 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Fri Apr 17 14:21:51 2015 -0500

----------------------------------------------------------------------
 docs/running-on-yarn.md                         |   6 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 125 ++++++++++++++++---
 .../spark/deploy/yarn/ExecutorRunnable.scala    |   2 +-
 .../apache/spark/deploy/yarn/ClientSuite.scala  |  29 +++--
 .../spark/deploy/yarn/YarnClusterSuite.scala    |   6 +-
 5 files changed, 132 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/50ab8a65/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 853c9f2..0968fc5 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -211,7 +211,11 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
 # Launching Spark on YARN
 
 Ensure that `HADOOP_CONF_DIR` or `YARN_CONF_DIR` points to the directory which contains the (client side) configuration files for the Hadoop cluster.
-These configs are used to write to the dfs and connect to the YARN ResourceManager.
+These configs are used to write to the dfs and connect to the YARN ResourceManager. The
+configuration contained in this directory will be distributed to the YARN cluster so that all
+containers used by the application use the same configuration. If the configuration references
+Java system properties or environment variables not managed by YARN, they should also be set in the
+Spark application's configuration (driver, executors, and the AM when running in client mode).
 
 There are two deploy modes that can be used to launch Spark applications on YARN. In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/50ab8a65/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 52e4dee..019afbd 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,15 +17,18 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.io.{File, FileOutputStream}
 import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
 import java.nio.ByteBuffer
+import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
 import scala.reflect.runtime.universe
 import scala.util.{Try, Success, Failure}
 
 import com.google.common.base.Objects
+import com.google.common.io.Files
 
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.conf.Configuration
@@ -77,12 +80,6 @@ private[spark] class Client(
 
   def stop(): Unit = yarnClient.stop()
 
-  /* ------------------------------------------------------------------------------------- *
-   | The following methods have much in common in the stable and alpha versions of Client, |
-   | but cannot be implemented in the parent trait due to subtle API differences across    |
-   | hadoop versions.                                                                      |
-   * ------------------------------------------------------------------------------------- */
-
   /**
    * Submit an application running our ApplicationMaster to the ResourceManager.
    *
@@ -223,6 +220,10 @@ private[spark] class Client(
     val fs = FileSystem.get(hadoopConf)
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
     val nns = getNameNodesToAccess(sparkConf) + dst
+    // Used to keep track of URIs added to the distributed cache. If the same URI is added
+    // multiple times, YARN will fail to launch containers for the app with an internal
+    // error.
+    val distributedUris = new HashSet[String]
     obtainTokensForNamenodes(nns, hadoopConf, credentials)
     obtainTokenForHiveMetastore(hadoopConf, credentials)
 
@@ -241,6 +242,17 @@ private[spark] class Client(
           "for alternatives.")
     }
 
+    def addDistributedUri(uri: URI): Boolean = {
+      val uriStr = uri.toString()
+      if (distributedUris.contains(uriStr)) {
+        logWarning(s"Resource $uri added multiple times to distributed cache.")
+        false
+      } else {
+        distributedUris += uriStr
+        true
+      }
+    }
+
     /**
      * Copy the given main resource to the distributed cache if the scheme is not "local".
      * Otherwise, set the corresponding key in our SparkConf to handle it downstream.
@@ -258,11 +270,13 @@ private[spark] class Client(
       if (!localPath.isEmpty()) {
         val localURI = new URI(localPath)
         if (localURI.getScheme != LOCAL_SCHEME) {
-          val src = getQualifiedLocalPath(localURI, hadoopConf)
-          val destPath = copyFileToRemote(dst, src, replication)
-          val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
-          distCacheMgr.addResource(destFs, hadoopConf, destPath,
-            localResources, LocalResourceType.FILE, destName, statCache)
+          if (addDistributedUri(localURI)) {
+            val src = getQualifiedLocalPath(localURI, hadoopConf)
+            val destPath = copyFileToRemote(dst, src, replication)
+            val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
+            distCacheMgr.addResource(destFs, hadoopConf, destPath,
+              localResources, LocalResourceType.FILE, destName, statCache)
+          }
         } else if (confKey != null) {
           // If the resource is intended for local use only, handle this downstream
           // by setting the appropriate property
@@ -271,6 +285,13 @@ private[spark] class Client(
       }
     }
 
+    createConfArchive().foreach { file =>
+      require(addDistributedUri(file.toURI()))
+      val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication)
+      distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
+        LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true)
+    }
+
     /**
      * Do the same for any additional resources passed in through ClientArguments.
      * Each resource category is represented by a 3-tuple of:
@@ -288,13 +309,15 @@ private[spark] class Client(
         flist.split(',').foreach { file =>
           val localURI = new URI(file.trim())
           if (localURI.getScheme != LOCAL_SCHEME) {
-            val localPath = new Path(localURI)
-            val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-            val destPath = copyFileToRemote(dst, localPath, replication)
-            distCacheMgr.addResource(
-              fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
-            if (addToClasspath) {
-              cachedSecondaryJarLinks += linkname
+            if (addDistributedUri(localURI)) {
+              val localPath = new Path(localURI)
+              val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
+              val destPath = copyFileToRemote(dst, localPath, replication)
+              distCacheMgr.addResource(
+                fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
+              if (addToClasspath) {
+                cachedSecondaryJarLinks += linkname
+              }
             }
           } else if (addToClasspath) {
             // Resource is intended for local use only and should be added to the class path
@@ -311,13 +334,64 @@ private[spark] class Client(
   }
 
   /**
+   * Create an archive with the Hadoop config files for distribution.
+   *
+   * These are only used by the AM, since executors will use the configuration object broadcast by
+   * the driver. The files are zipped and added to the job as an archive, so that YARN will explode
+   * it when distributing to the AM. This directory is then added to the classpath of the AM
+   * process, just to make sure that everybody is using the same default config.
+   *
+   * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR
+   * shows up in the classpath before YARN_CONF_DIR.
+   *
+   * Currently this makes a shallow copy of the conf directory. If there are cases where a
+   * Hadoop config directory contains subdirectories, this code will have to be fixed.
+   */
+  private def createConfArchive(): Option[File] = {
+    val hadoopConfFiles = new HashMap[String, File]()
+    Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
+      sys.env.get(envKey).foreach { path =>
+        val dir = new File(path)
+        if (dir.isDirectory()) {
+          dir.listFiles().foreach { file =>
+            if (!hadoopConfFiles.contains(file.getName())) {
+              hadoopConfFiles(file.getName()) = file
+            }
+          }
+        }
+      }
+    }
+
+    if (!hadoopConfFiles.isEmpty) {
+      val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip",
+        new File(Utils.getLocalDir(sparkConf)))
+
+      val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive))
+      try {
+        hadoopConfStream.setLevel(0)
+        hadoopConfFiles.foreach { case (name, file) =>
+          hadoopConfStream.putNextEntry(new ZipEntry(name))
+          Files.copy(file, hadoopConfStream)
+          hadoopConfStream.closeEntry()
+        }
+      } finally {
+        hadoopConfStream.close()
+      }
+
+      Some(hadoopConfArchive)
+    } else {
+      None
+    }
+  }
+
+  /**
    * Set up the environment for launching our ApplicationMaster container.
    */
   private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
     logInfo("Setting up the launch environment for our AM container")
     val env = new HashMap[String, String]()
     val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
-    populateClasspath(args, yarnConf, sparkConf, env, extraCp)
+    populateClasspath(args, yarnConf, sparkConf, env, true, extraCp)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
     env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -718,6 +792,9 @@ object Client extends Logging {
   // Distribution-defined classpath to add to processes
   val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"
 
+  // Subdirectory where the user's hadoop config files will be placed.
+  val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"
+
   /**
    * Find the user-defined Spark jar if configured, or return the jar containing this
    * class if not.
@@ -831,11 +908,19 @@ object Client extends Logging {
       conf: Configuration,
       sparkConf: SparkConf,
       env: HashMap[String, String],
+      isAM: Boolean,
       extraClassPath: Option[String] = None): Unit = {
     extraClassPath.foreach(addClasspathEntry(_, env))
     addClasspathEntry(
       YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
     )
+
+    if (isAM) {
+      addClasspathEntry(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
+          LOCALIZED_HADOOP_CONF_DIR, env)
+    }
+
     if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) {
       val userClassPath =
         if (args != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/50ab8a65/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index b06069c..9d04d24 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -277,7 +277,7 @@ class ExecutorRunnable(
   private def prepareEnvironment(container: Container): HashMap[String, String] = {
     val env = new HashMap[String, String]()
     val extraCp = sparkConf.getOption("spark.executor.extraClassPath")
-    Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
+    Client.populateClasspath(null, yarnConf, sparkConf, env, false, extraCp)
 
     sparkConf.getExecutorEnv.foreach { case (key, value) =>
       // This assumes each executor environment variable set here is a path

http://git-wip-us.apache.org/repos/asf/spark/blob/50ab8a65/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index c1b94ac..a51c200 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -20,6 +20,11 @@ package org.apache.spark.deploy.yarn
 import java.io.File
 import java.net.URI
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ HashMap => MutableHashMap }
+import scala.reflect.ClassTag
+import scala.util.Try
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.MRJobConfig
@@ -30,11 +35,6 @@ import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ HashMap => MutableHashMap }
-import scala.reflect.ClassTag
-import scala.util.Try
-
 import org.apache.spark.{SparkException, SparkConf}
 import org.apache.spark.util.Utils
 
@@ -93,7 +93,7 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
     val env = new MutableHashMap[String, String]()
     val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
 
-    Client.populateClasspath(args, conf, sparkConf, env)
+    Client.populateClasspath(args, conf, sparkConf, env, true)
 
     val cp = env("CLASSPATH").split(":|;|<CPS>")
     s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -104,13 +104,16 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll {
         cp should not contain (uri.getPath())
       }
     })
-    if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
-      cp should contain("{{PWD}}")
-    } else if (Utils.isWindows) {
-      cp should contain("%PWD%")
-    } else {
-      cp should contain(Environment.PWD.$())
-    }
+    val pwdVar =
+      if (classOf[Environment].getMethods().exists(_.getName == "$$")) {
+        "{{PWD}}"
+      } else if (Utils.isWindows) {
+        "%PWD%"
+      } else {
+        Environment.PWD.$()
+      }
+    cp should contain(pwdVar)
+    cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}")
     cp should not contain (Client.SPARK_JAR)
     cp should not contain (Client.APP_JAR)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/50ab8a65/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index a18c94d..3877da4 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -77,6 +77,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
   private var yarnCluster: MiniYARNCluster = _
   private var tempDir: File = _
   private var fakeSparkJar: File = _
+  private var hadoopConfDir: File = _
   private var logConfDir: File = _
 
   override def beforeAll() {
@@ -120,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
     logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
 
     fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
+    hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR)
+    assert(hadoopConfDir.mkdir())
+    File.createTempFile("token", ".txt", hadoopConfDir)
   }
 
   override def afterAll() {
@@ -258,7 +262,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
       appArgs
 
     Utils.executeAndGetOutput(argv,
-      extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath()))
+      extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()))
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org