You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/09/23 18:21:03 UTC

[2/2] git commit: [SPARK-3477] Clean up code in Yarn Client / ClientBase

[SPARK-3477] Clean up code in Yarn Client / ClientBase

This is part of a broader effort to clean up the Yarn integration code after #2020.

The high-level changes in this PR include:
- Removing duplicate code, especially across the alpha and stable APIs
- Simplify unnecessarily complex method signatures and hierarchies
- Rename unclear variable and method names
- Organize logging output produced when the user runs Spark on Yarn
- Extensively add documentation
- Privatize classes where possible

I have tested the stable API on a Hadoop 2.4 cluster. I tested submitting a jar that references classes in other jars in both client and cluster mode. I also made changes in the alpha API, though I do not have access to an alpha cluster. I have verified that it compiles, but it would be ideal if others can help test it.

For those interested in some examples in detail, please read on.

--------------------------------------------------------------------------------------------------------

***Appendix***

- The loop to `getApplicationReport` from the RM is duplicated in 4 places: in the stable `Client`, alpha `Client`, and twice in `YarnClientSchedulerBackend`. We should not have different loops for client and cluster deploy modes.
- There are many fragmented small helper methods that are only used once and should just be inlined. For instance, `ClientBase#getLocalPath` returns `null` on certain conditions, and its only caller `ClientBase#addFileToClasspath` checks whether the value returned is `null`. We could just have the caller check on that same condition to avoid passing `null`s around.
- In `YarnSparkHadoopUtil#addToEnvironment`, we take in an argument `classpathSeparator` that always has the same value upstream (i.e. `File.pathSeparator`). This argument is now removed from the signature and all callers of this method upstream.
- `ClientBase#copyRemoteFile` is now renamed to `copyFileToRemote`. It was unclear whether we are copying a remote file to our local file system, or copying a locally visible file to a remote file system. Also, even the content of the method has inaccurately named variables. We use `val remoteFs` to signify the file system of the locally visible file and `val fs` to signify the remote, destination file system. These are now renamed `srcFs` and `destFs` respectively.
- We currently log the AM container's environment and resource mappings directly as Scala collections. This is incredibly hard to read and probably too verbose for the average Spark user. In other modes (e.g. standalone), we also don't log the launch commands by default, so the logging level of these information is now set to `DEBUG`.
- None of these classes (`Client`, `ClientBase`, `YarnSparkHadoopUtil` etc.) is intended to be used by a Spark application (the user should go through Spark submit instead). At the very least they should be `private[spark]`.

Author: Andrew Or <an...@gmail.com>

Closes #2350 from andrewor14/yarn-cleanup and squashes the following commits:

39e8c7b [Andrew Or] Address review comments
6619f9b [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup
2ca6d64 [Andrew Or] Improve logging in application monitor
a3b9693 [Andrew Or] Minor changes
7dd6298 [Andrew Or] Simplify ClientBase#monitorApplication
547487c [Andrew Or] Provide default values for null application report entries
a0ad1e9 [Andrew Or] Fix class not found error
1590141 [Andrew Or] Address review comments
45ccdea [Andrew Or] Remove usages of getAMMemory
d8e33b6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup
ed0b42d [Andrew Or] Fix alpha compilation error
c0587b4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup
6d74888 [Andrew Or] Minor comment changes
6573c1d [Andrew Or] Clean up, simplify and document code for setting classpaths
e4779b6 [Andrew Or] Clean up log messages + variable naming in ClientBase
8766d37 [Andrew Or] Heavily add documentation to Client* classes + various clean-ups
6c94d79 [Andrew Or] Various cleanups in ClientBase and ClientArguments
ef7069a [Andrew Or] Clean up YarnClientSchedulerBackend more
6de9072 [Andrew Or] Guard against potential NPE in debug logging mode
fabe4c4 [Andrew Or] Reuse more code in YarnClientSchedulerBackend
3f941dc [Andrew Or] First cut at simplifying the Client (stable and alpha)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4022dd5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4022dd5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4022dd5

Branch: refs/heads/master
Commit: c4022dd52b4827323ff956632dc7623f546da937
Parents: 14f8c34
Author: Andrew Or <an...@gmail.com>
Authored: Tue Sep 23 11:20:52 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Tue Sep 23 11:20:52 2014 -0500

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 145 ++--
 .../spark/deploy/yarn/ClientArguments.scala     |  67 +-
 .../apache/spark/deploy/yarn/ClientBase.scala   | 682 +++++++++++--------
 .../yarn/ClientDistributedCacheManager.scala    |  97 ++-
 .../deploy/yarn/ExecutorRunnableUtil.scala      |  16 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  63 +-
 .../cluster/YarnClientSchedulerBackend.scala    | 145 ++--
 .../spark/deploy/yarn/ClientBaseSuite.scala     |  18 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 167 ++---
 9 files changed, 738 insertions(+), 662 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index aff9ab7..5a20532 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -23,13 +23,11 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.DataOutputBuffer
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.YarnClientImpl
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, Records}
+import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -37,7 +35,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
 /**
  * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's alpha API.
  */
-class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
+private[spark] class Client(
+    val args: ClientArguments,
+    val hadoopConf: Configuration,
+    val sparkConf: SparkConf)
   extends YarnClientImpl with ClientBase with Logging {
 
   def this(clientArgs: ClientArguments, spConf: SparkConf) =
@@ -45,112 +46,86 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
 
   def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
 
-  val args = clientArgs
-  val conf = hadoopConf
-  val sparkConf = spConf
-  var rpc: YarnRPC = YarnRPC.create(conf)
-  val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
+  val yarnConf: YarnConfiguration = new YarnConfiguration(hadoopConf)
 
+  /* ------------------------------------------------------------------------------------- *
+   | The following methods have much in common in the stable and alpha versions of Client, |
+   | but cannot be implemented in the parent trait due to subtle API differences across    |
+   | hadoop versions.                                                                      |
+   * ------------------------------------------------------------------------------------- */
 
-  // for client user who want to monitor app status by itself.
-  def runApp() = {
-    validateArgs()
-
+  /** Submit an application running our ApplicationMaster to the ResourceManager. */
+  override def submitApplication(): ApplicationId = {
     init(yarnConf)
     start()
-    logClusterResourceDetails()
 
-    val newApp = super.getNewApplication()
-    val appId = newApp.getApplicationId()
+    logInfo("Requesting a new application from cluster with %d NodeManagers"
+      .format(getYarnClusterMetrics.getNumNodeManagers))
 
-    verifyClusterResources(newApp)
-    val appContext = createApplicationSubmissionContext(appId)
-    val appStagingDir = getAppStagingDir(appId)
-    val localResources = prepareLocalResources(appStagingDir)
-    val env = setupLaunchEnv(localResources, appStagingDir)
-    val amContainer = createContainerLaunchContext(newApp, localResources, env)
+    // Get a new application from our RM
+    val newAppResponse = getNewApplication()
+    val appId = newAppResponse.getApplicationId()
 
-    val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
-    // Memory for the ApplicationMaster.
-    capability.setMemory(args.amMemory + memoryOverhead)
-    amContainer.setResource(capability)
+    // Verify whether the cluster has enough resources for our AM
+    verifyClusterResources(newAppResponse)
 
-    appContext.setQueue(args.amQueue)
-    appContext.setAMContainerSpec(amContainer)
-    appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
+    // Set up the appropriate contexts to launch our AM
+    val containerContext = createContainerLaunchContext(newAppResponse)
+    val appContext = createApplicationSubmissionContext(appId, containerContext)
 
-    submitApp(appContext)
+    // Finally, submit and monitor the application
+    logInfo(s"Submitting application ${appId.getId} to ResourceManager")
+    submitApplication(appContext)
     appId
   }
 
-  def run() {
-    val appId = runApp()
-    monitorApplication(appId)
-  }
-
-  def logClusterResourceDetails() {
-    val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
-    logInfo("Got cluster metric info from ASM, numNodeManagers = " +
-      clusterMetrics.getNumNodeManagers)
+  /**
+   * Set up a context for launching our ApplicationMaster container.
+   * In the Yarn alpha API, the memory requirements of this container must be set in
+   * the ContainerLaunchContext instead of the ApplicationSubmissionContext.
+   */
+  override def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
+      : ContainerLaunchContext = {
+    val containerContext = super.createContainerLaunchContext(newAppResponse)
+    val capability = Records.newRecord(classOf[Resource])
+    capability.setMemory(args.amMemory + amMemoryOverhead)
+    containerContext.setResource(capability)
+    containerContext
   }
 
-
-  def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
-    logInfo("Setting up application submission context for ASM")
+  /** Set up the context for submitting our ApplicationMaster. */
+  def createApplicationSubmissionContext(
+      appId: ApplicationId,
+      containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
     val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
     appContext.setApplicationId(appId)
     appContext.setApplicationName(args.appName)
+    appContext.setQueue(args.amQueue)
+    appContext.setAMContainerSpec(containerContext)
+    appContext.setUser(UserGroupInformation.getCurrentUser.getShortUserName)
     appContext
   }
 
-  def setupSecurityToken(amContainer: ContainerLaunchContext) = {
-    // Setup security tokens.
+  /**
+   * Set up security tokens for launching our ApplicationMaster container.
+   * ContainerLaunchContext#setContainerTokens is renamed `setTokens` in the stable API.
+   */
+  override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
     val dob = new DataOutputBuffer()
     credentials.writeTokenStorageToStream(dob)
     amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
   }
 
-  def submitApp(appContext: ApplicationSubmissionContext) = {
-    // Submit the application to the applications manager.
-    logInfo("Submitting application to ASM")
-    super.submitApplication(appContext)
-  }
-
-  def monitorApplication(appId: ApplicationId): Boolean = {
-    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
-
-    while (true) {
-      Thread.sleep(interval)
-      val report = super.getApplicationReport(appId)
-
-      logInfo("Application report from ASM: \n" +
-        "\t application identifier: " + appId.toString() + "\n" +
-        "\t appId: " + appId.getId() + "\n" +
-        "\t clientToken: " + report.getClientToken() + "\n" +
-        "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
-        "\t appMasterHost: " + report.getHost() + "\n" +
-        "\t appQueue: " + report.getQueue() + "\n" +
-        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
-        "\t appStartTime: " + report.getStartTime() + "\n" +
-        "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
-        "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
-        "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
-        "\t appUser: " + report.getUser()
-      )
-
-      val state = report.getYarnApplicationState()
-      if (state == YarnApplicationState.FINISHED ||
-        state == YarnApplicationState.FAILED ||
-        state == YarnApplicationState.KILLED) {
-        return true
-      }
-    }
-    true
-  }
+  /**
+   * Return the security token used by this client to communicate with the ApplicationMaster.
+   * If no security is enabled, the token returned by the report is null.
+   * ApplicationReport#getClientToken is renamed `getClientToAMToken` in the stable API.
+   */
+  override def getClientToken(report: ApplicationReport): String =
+    Option(report.getClientToken).getOrElse("")
 }
 
 object Client {
-
   def main(argStrings: Array[String]) {
     if (!sys.props.contains("SPARK_SUBMIT")) {
       println("WARNING: This client is deprecated and will be removed in a " +
@@ -158,19 +133,17 @@ object Client {
     }
 
     // Set an env variable indicating we are running in YARN mode.
-    // Note that anything with SPARK prefix gets propagated to all (remote) processes
+    // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
     System.setProperty("SPARK_YARN_MODE", "true")
-
     val sparkConf = new SparkConf
 
     try {
       val args = new ClientArguments(argStrings, sparkConf)
       new Client(args, sparkConf).run()
     } catch {
-      case e: Exception => {
+      case e: Exception =>
         Console.err.println(e.getMessage)
         System.exit(1)
-      }
     }
 
     System.exit(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 40d8d6d..201b742 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -17,15 +17,14 @@
 
 package org.apache.spark.deploy.yarn
 
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.InputFormatInfo
 import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 
 
 // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
-class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
+private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) {
   var addJars: String = null
   var files: String = null
   var archives: String = null
@@ -35,28 +34,56 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
   var executorMemory = 1024 // MB
   var executorCores = 1
   var numExecutors = 2
-  var amQueue = sparkConf.get("QUEUE", "default")
+  var amQueue = sparkConf.get("spark.yarn.queue", "default")
   var amMemory: Int = 512 // MB
   var appName: String = "Spark"
   var priority = 0
 
-  parseArgs(args.toList)
+  // Additional memory to allocate to containers
+  // For now, use driver's memory overhead as our AM container's memory overhead
+  val amMemoryOverhead = sparkConf.getInt(
+    "spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
+  val executorMemoryOverhead = sparkConf.getInt(
+    "spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
 
-  // env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then
-  // it should default to hdfs://
-  files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull)
-  archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull)
+  parseArgs(args.toList)
+  loadEnvironmentArgs()
+  validateArgs()
+
+  /** Load any default arguments provided through environment variables and Spark properties. */
+  private def loadEnvironmentArgs(): Unit = {
+    // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
+    // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051).
+    files = Option(files)
+      .orElse(sys.env.get("SPARK_YARN_DIST_FILES"))
+      .orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => Utils.resolveURIs(p)))
+      .orNull
+    archives = Option(archives)
+      .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
+      .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
+      .orNull
+  }
 
-  // spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified,
-  // for both yarn-client and yarn-cluster
-  files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files").
-    map(p => Utils.resolveURIs(p)).orNull)
-  archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives").
-    map(p => Utils.resolveURIs(p)).orNull)
+  /**
+   * Fail fast if any arguments provided are invalid.
+   * This is intended to be called only after the provided arguments have been parsed.
+   */
+  private def validateArgs(): Unit = {
+    // TODO: memory checks are outdated (SPARK-3476)
+    Map[Boolean, String](
+      (numExecutors <= 0) -> "You must specify at least 1 executor!",
+      (amMemory <= amMemoryOverhead) -> s"AM memory must be > $amMemoryOverhead MB",
+      (executorMemory <= executorMemoryOverhead) ->
+        s"Executor memory must be > $executorMemoryOverhead MB"
+    ).foreach { case (errorCondition, errorMessage) =>
+      if (errorCondition) {
+        throw new IllegalArgumentException(errorMessage + "\n" + getUsageMessage())
+      }
+    }
+  }
 
   private def parseArgs(inputArgs: List[String]): Unit = {
-    val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
-  
+    val userArgsBuffer = new ArrayBuffer[String]()
     var args = inputArgs
 
     while (!args.isEmpty) {
@@ -138,16 +165,14 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
     userArgs = userArgsBuffer.readOnly
   }
 
-
-  def getUsageMessage(unknownParam: Any = null): String = {
+  private def getUsageMessage(unknownParam: List[String] = null): String = {
     val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
-
     message +
       "Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
       "Options:\n" +
       "  --jar JAR_PATH             Path to your application's JAR file (required in yarn-cluster mode)\n" +
       "  --class CLASS_NAME         Name of your application's main class (required)\n" +
-      "  --arg ARGS                 Argument to be passed to your application's main class.\n" +
+      "  --arg ARG                  Argument to be passed to your application's main class.\n" +
       "                             Multiple invocations are possible, each will be passed in order.\n" +
       "  --num-executors NUM        Number of executors to start (Default: 2)\n" +
       "  --executor-cores NUM       Number of cores for the executors (Default: 1).\n" +

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 6ae4d49..4870b0c 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.File
 import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
 
 import scala.collection.JavaConversions._
@@ -37,154 +36,107 @@ import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.Records
+
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
 
 /**
- * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
- * Client submits an application to the YARN ResourceManager.
+ * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN.
+ * The Client submits an application to the YARN ResourceManager.
  */
-trait ClientBase extends Logging {
-  val args: ClientArguments
-  val conf: Configuration
-  val sparkConf: SparkConf
-  val yarnConf: YarnConfiguration
-  val credentials = UserGroupInformation.getCurrentUser().getCredentials()
-  private val SPARK_STAGING: String = ".sparkStaging"
+private[spark] trait ClientBase extends Logging {
+  import ClientBase._
+
+  protected val args: ClientArguments
+  protected val hadoopConf: Configuration
+  protected val sparkConf: SparkConf
+  protected val yarnConf: YarnConfiguration
+  protected val credentials = UserGroupInformation.getCurrentUser.getCredentials
+  protected val amMemoryOverhead = args.amMemoryOverhead // MB
+  protected val executorMemoryOverhead = args.executorMemoryOverhead // MB
   private val distCacheMgr = new ClientDistributedCacheManager()
 
-  // Staging directory is private! -> rwx--------
-  val STAGING_DIR_PERMISSION: FsPermission =
-    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
-  // App files are world-wide readable and owner writable -> rw-r--r--
-  val APP_FILE_PERMISSION: FsPermission =
-    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
-
-  // Additional memory overhead - in mb.
-  protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
-    YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-
-  // TODO(harvey): This could just go in ClientArguments.
-  def validateArgs() = {
-    Map(
-      (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!",
-      (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" +
-        "greater than: " + memoryOverhead),
-      (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" +
-        "must be greater than: " + memoryOverhead.toString)
-    ).foreach { case(cond, errStr) =>
-      if (cond) {
-        logError(errStr)
-        throw new IllegalArgumentException(args.getUsageMessage())
-      }
-    }
-  }
-
-  def getAppStagingDir(appId: ApplicationId): String = {
-    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
-  }
-
-  def verifyClusterResources(app: GetNewApplicationResponse) = {
-    val maxMem = app.getMaximumResourceCapability().getMemory()
-    logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
-
-    // If we have requested more then the clusters max for a single resource then exit.
-    if (args.executorMemory > maxMem) {
-      val errorMessage =
-        "Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster."
-          .format(args.executorMemory, maxMem)
-
-      logError(errorMessage)
-      throw new IllegalArgumentException(errorMessage)
-    }
-    val amMem = args.amMemory + memoryOverhead
+  /**
+   * Fail fast if we have requested more resources per container than is available in the cluster.
+   */
+  protected def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
+    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
+    logInfo("Verifying our application has not requested more than the maximum " +
+      s"memory capability of the cluster ($maxMem MB per container)")
+    val executorMem = args.executorMemory + executorMemoryOverhead
+    if (executorMem > maxMem) {
+      throw new IllegalArgumentException(s"Required executor memory ($executorMem MB) " +
+        s"is above the max threshold ($maxMem MB) of this cluster!")
+    }
+    val amMem = args.amMemory + amMemoryOverhead
     if (amMem > maxMem) {
-
-      val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster."
-        .format(amMem, maxMem)
-      logError(errorMessage)
-      throw new IllegalArgumentException(errorMessage)
+      throw new IllegalArgumentException(s"Required AM memory ($amMem MB) " +
+        s"is above the max threshold ($maxMem MB) of this cluster!")
     }
-
     // We could add checks to make sure the entire cluster has enough resources but that involves
     // getting all the node reports and computing ourselves.
   }
 
-  /** See if two file systems are the same or not. */
-  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
-    val srcUri = srcFs.getUri()
-    val dstUri = destFs.getUri()
-    if (srcUri.getScheme() == null) {
-      return false
-    }
-    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
-      return false
-    }
-    var srcHost = srcUri.getHost()
-    var dstHost = dstUri.getHost()
-    if ((srcHost != null) && (dstHost != null)) {
-      try {
-        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
-        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
-      } catch {
-        case e: UnknownHostException =>
-          return false
-      }
-      if (!srcHost.equals(dstHost)) {
-        return false
-      }
-    } else if (srcHost == null && dstHost != null) {
-      return false
-    } else if (srcHost != null && dstHost == null) {
-      return false
-    }
-    if (srcUri.getPort() != dstUri.getPort()) {
-      false
-    } else {
-      true
-    }
-  }
-
-  /** Copy the file into HDFS if needed. */
-  private[yarn] def copyRemoteFile(
-      dstDir: Path,
-      originalPath: Path,
+  /**
+   * Copy the given file to a remote file system (e.g. HDFS) if needed.
+   * The file is only copied if the source and destination file systems are different. This is used
+   * for preparing resources for launching the ApplicationMaster container. Exposed for testing.
+   */
+  def copyFileToRemote(
+      destDir: Path,
+      srcPath: Path,
       replication: Short,
       setPerms: Boolean = false): Path = {
-    val fs = FileSystem.get(conf)
-    val remoteFs = originalPath.getFileSystem(conf)
-    var newPath = originalPath
-    if (!compareFs(remoteFs, fs)) {
-      newPath = new Path(dstDir, originalPath.getName())
-      logInfo("Uploading " + originalPath + " to " + newPath)
-      FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
-      fs.setReplication(newPath, replication)
-      if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
+    val destFs = destDir.getFileSystem(hadoopConf)
+    val srcFs = srcPath.getFileSystem(hadoopConf)
+    var destPath = srcPath
+    if (!compareFs(srcFs, destFs)) {
+      destPath = new Path(destDir, srcPath.getName())
+      logInfo(s"Uploading resource $srcPath -> $destPath")
+      FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
+      destFs.setReplication(destPath, replication)
+      if (setPerms) {
+        destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
+      }
+    } else {
+      logInfo(s"Source and destination file systems are the same. Not copying $srcPath")
     }
     // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
     // version shows the specific version in the distributed cache configuration
-    val qualPath = fs.makeQualified(newPath)
-    val fc = FileContext.getFileContext(qualPath.toUri(), conf)
-    val destPath = fc.resolvePath(qualPath)
-    destPath
+    val qualifiedDestPath = destFs.makeQualified(destPath)
+    val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf)
+    fc.resolvePath(qualifiedDestPath)
   }
 
-  private def qualifyForLocal(localURI: URI): Path = {
-    var qualifiedURI = localURI
-    // If not specified, assume these are in the local filesystem to keep behavior like Hadoop
-    if (qualifiedURI.getScheme() == null) {
-      qualifiedURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(qualifiedURI)).toString)
-    }
+  /**
+   * Given a local URI, resolve it and return a qualified local path that corresponds to the URI.
+   * This is used for preparing local resources to be included in the container launch context.
+   */
+  private def getQualifiedLocalPath(localURI: URI): Path = {
+    val qualifiedURI =
+      if (localURI.getScheme == null) {
+        // If not specified, assume this is in the local filesystem to keep the behavior
+        // consistent with that of Hadoop
+        new URI(FileSystem.getLocal(hadoopConf).makeQualified(new Path(localURI)).toString)
+      } else {
+        localURI
+      }
     new Path(qualifiedURI)
   }
 
+  /**
+   * Upload any resources to the distributed cache if needed. If a resource is intended to be
+   * consumed locally, set up the appropriate config for downstream code to handle it properly.
+   * This is used for setting up a container launch context for our ApplicationMaster.
+   * Exposed for testing.
+   */
   def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = {
-    logInfo("Preparing Local resources")
-    // Upload Spark and the application JAR to the remote file system if necessary. Add them as
-    // local resources to the application master.
-    val fs = FileSystem.get(conf)
+    logInfo("Preparing resources for our AM container")
+    // Upload Spark and the application JAR to the remote file system if necessary,
+    // and add them as local resources to the application master.
+    val fs = FileSystem.get(hadoopConf)
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val nns = ClientBase.getNameNodesToAccess(sparkConf) + dst
-    ClientBase.obtainTokensForNamenodes(nns, conf, credentials)
+    val nns = getNameNodesToAccess(sparkConf) + dst
+    obtainTokensForNamenodes(nns, hadoopConf, credentials)
 
     val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort
     val localResources = HashMap[String, LocalResource]()
@@ -200,73 +152,84 @@ trait ClientBase extends Logging {
         "for alternatives.")
     }
 
+    /**
+     * Copy the given main resource to the distributed cache if the scheme is not "local".
+     * Otherwise, set the corresponding key in our SparkConf to handle it downstream.
+     * Each resource is represented by a 4-tuple of:
+     *   (1) destination resource name,
+     *   (2) local path to the resource,
+     *   (3) Spark property key to set if the scheme is not local, and
+     *   (4) whether to set permissions for this resource
+     */
     List(
-      (ClientBase.SPARK_JAR, ClientBase.sparkJar(sparkConf), ClientBase.CONF_SPARK_JAR),
-      (ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR),
-      ("log4j.properties", oldLog4jConf.getOrElse(null), null)
-    ).foreach { case(destName, _localPath, confKey) =>
+      (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false),
+      (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true),
+      ("log4j.properties", oldLog4jConf.orNull, null, false)
+    ).foreach { case (destName, _localPath, confKey, setPermissions) =>
       val localPath: String = if (_localPath != null) _localPath.trim() else ""
-      if (! localPath.isEmpty()) {
+      if (!localPath.isEmpty()) {
         val localURI = new URI(localPath)
-        if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
-          val setPermissions = destName.equals(ClientBase.APP_JAR)
-          val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions)
-          val destFs = FileSystem.get(destPath.toUri(), conf)
-          distCacheMgr.addResource(destFs, conf, destPath, localResources, LocalResourceType.FILE,
-            destName, statCache)
+        if (localURI.getScheme != LOCAL_SCHEME) {
+          val src = getQualifiedLocalPath(localURI)
+          val destPath = copyFileToRemote(dst, src, replication, setPermissions)
+          val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
+          distCacheMgr.addResource(destFs, hadoopConf, destPath,
+            localResources, LocalResourceType.FILE, destName, statCache)
         } else if (confKey != null) {
+          // If the resource is intended for local use only, handle this downstream
+          // by setting the appropriate property
           sparkConf.set(confKey, localPath)
         }
       }
     }
 
+    /**
+     * Do the same for any additional resources passed in through ClientArguments.
+     * Each resource category is represented by a 3-tuple of:
+     *   (1) comma separated list of resources in this category,
+     *   (2) resource type, and
+     *   (3) whether to add these resources to the classpath
+     */
     val cachedSecondaryJarLinks = ListBuffer.empty[String]
-    val fileLists = List( (args.addJars, LocalResourceType.FILE, true),
+    List(
+      (args.addJars, LocalResourceType.FILE, true),
       (args.files, LocalResourceType.FILE, false),
-      (args.archives, LocalResourceType.ARCHIVE, false) )
-    fileLists.foreach { case (flist, resType, addToClasspath) =>
+      (args.archives, LocalResourceType.ARCHIVE, false)
+    ).foreach { case (flist, resType, addToClasspath) =>
       if (flist != null && !flist.isEmpty()) {
-        flist.split(',').foreach { case file: String =>
+        flist.split(',').foreach { file =>
           val localURI = new URI(file.trim())
-          if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) {
+          if (localURI.getScheme != LOCAL_SCHEME) {
             val localPath = new Path(localURI)
             val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
-            val destPath = copyRemoteFile(dst, localPath, replication)
-            distCacheMgr.addResource(fs, conf, destPath, localResources, resType,
-              linkname, statCache)
+            val destPath = copyFileToRemote(dst, localPath, replication)
+            distCacheMgr.addResource(
+              fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
             if (addToClasspath) {
               cachedSecondaryJarLinks += linkname
             }
           } else if (addToClasspath) {
+            // Resource is intended for local use only and should be added to the class path
             cachedSecondaryJarLinks += file.trim()
           }
         }
       }
     }
-    logInfo("Prepared Local resources " + localResources)
-    sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
+    if (cachedSecondaryJarLinks.nonEmpty) {
+      sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
+    }
 
-    UserGroupInformation.getCurrentUser().addCredentials(credentials)
     localResources
   }
 
-  /** Get all application master environment variables set on this SparkConf */
-  def getAppMasterEnv: Seq[(String, String)] = {
-    val prefix = "spark.yarn.appMasterEnv."
-    sparkConf.getAll.filter{case (k, v) => k.startsWith(prefix)}
-      .map{case (k, v) => (k.substring(prefix.length), v)}
-  }
-
-
-  def setupLaunchEnv(
-      localResources: HashMap[String, LocalResource],
-      stagingDir: String): HashMap[String, String] = {
-    logInfo("Setting up the launch environment")
-
+  /**
+   * Set up the environment for launching our ApplicationMaster container.
+   */
+  private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
+    logInfo("Setting up the launch environment for our AM container")
     val env = new HashMap[String, String]()
-
     val extraCp = sparkConf.getOption("spark.driver.extraClassPath")
-    ClientBase.populateClasspath(args, yarnConf, sparkConf, env, extraCp)
+    populateClasspath(args, yarnConf, sparkConf, env, extraCp)
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
     env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
@@ -275,42 +238,20 @@ trait ClientBase extends Logging {
     distCacheMgr.setDistFilesEnv(env)
     distCacheMgr.setDistArchivesEnv(env)
 
-    getAppMasterEnv.foreach { case (key, value) =>
-      YarnSparkHadoopUtil.addToEnvironment(env, key, value, File.pathSeparator)
-    }
+    // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
+    val amEnvPrefix = "spark.yarn.appMasterEnv."
+    sparkConf.getAll
+      .filter { case (k, v) => k.startsWith(amEnvPrefix) }
+      .map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
+      .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) }
 
     // Keep this for backwards compatibility but users should move to the config
     sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
       // Allow users to specify some environment variables.
-      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs, File.pathSeparator)
-
+      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
       // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments.
       env("SPARK_YARN_USER_ENV") = userEnvs
     }
-    env
-  }
-
-  def userArgsToString(clientArgs: ClientArguments): String = {
-    val prefix = " --arg "
-    val args = clientArgs.userArgs
-    val retval = new StringBuilder()
-    for (arg <- args) {
-      retval.append(prefix).append(" ").append(YarnSparkHadoopUtil.escapeForShell(arg))
-    }
-    retval.toString
-  }
-
-  def setupSecurityToken(amContainer: ContainerLaunchContext)
-
-  def createContainerLaunchContext(
-        newApp: GetNewApplicationResponse,
-        localResources: HashMap[String, LocalResource],
-        env: HashMap[String, String]): ContainerLaunchContext = {
-    logInfo("Setting up container launch context")
-    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
-    amContainer.setLocalResources(localResources)
-
-    val isLaunchingDriver = args.userClass != null
 
     // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
     // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
@@ -320,6 +261,7 @@ trait ClientBase extends Logging {
     // Note that to warn the user about the deprecation in cluster mode, some code from
     // SparkConf#validateSettings() is duplicated here (to avoid triggering the condition
     // described above).
+    val isLaunchingDriver = args.userClass != null
     if (isLaunchingDriver) {
       sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
         val warning =
@@ -342,14 +284,30 @@ trait ClientBase extends Logging {
         env("SPARK_JAVA_OPTS") = value
       }
     }
-    amContainer.setEnvironment(env)
 
-    val amMemory = args.amMemory
+    env
+  }
+
+  /**
+   * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
+   * This sets up the launch environment, java options, and the command for launching the AM.
+   */
+  protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
+      : ContainerLaunchContext = {
+    logInfo("Setting up container launch context for our AM")
+
+    val appId = newAppResponse.getApplicationId
+    val appStagingDir = getAppStagingDir(appId)
+    val localResources = prepareLocalResources(appStagingDir)
+    val launchEnv = setupLaunchEnv(appStagingDir)
+    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+    amContainer.setLocalResources(localResources)
+    amContainer.setEnvironment(launchEnv)
 
     val javaOpts = ListBuffer[String]()
 
     // Add Xmx for AM memory
-    javaOpts += "-Xmx" + amMemory + "m"
+    javaOpts += "-Xmx" + args.amMemory + "m"
 
     val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
     javaOpts += "-Djava.io.tmpdir=" + tmpDir
@@ -361,8 +319,7 @@ trait ClientBase extends Logging {
     // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
     // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
     // of cores on a node.
-    val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") &&
-      java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))
+    val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
     if (useConcurrentAndIncrementalGC) {
       // In our expts, using (default) throughput collector has severe perf ramifications in
       // multi-tenant machines
@@ -380,6 +337,8 @@ trait ClientBase extends Logging {
       javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v")
     }
 
+    // Include driver-specific java options if we are launching a driver
+    val isLaunchingDriver = args.userClass != null
     if (isLaunchingDriver) {
       sparkConf.getOption("spark.driver.extraJavaOptions")
         .orElse(sys.env.get("SPARK_JAVA_OPTS"))
@@ -397,19 +356,27 @@ trait ClientBase extends Logging {
       } else {
         Nil
       }
+    val userJar =
+      if (args.userJar != null) {
+        Seq("--jar", args.userJar)
+      } else {
+        Nil
+      }
     val amClass =
       if (isLaunchingDriver) {
-        classOf[ApplicationMaster].getName()
+        Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
       } else {
-        classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher")
+        Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
       }
+    val userArgs = args.userArgs.flatMap { arg =>
+      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
+    }
     val amArgs =
-      Seq(amClass) ++ userClass ++
-      (if (args.userJar != null) Seq("--jar", args.userJar) else Nil) ++
-      Seq("--executor-memory", args.executorMemory.toString,
+      Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
+      Seq(
+        "--executor-memory", args.executorMemory.toString,
         "--executor-cores", args.executorCores.toString,
-        "--num-executors ", args.numExecutors.toString,
-        userArgsToString(args))
+        "--num-executors ", args.numExecutors.toString)
 
     // Command for the ApplicationMaster
     val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
@@ -418,41 +385,153 @@ trait ClientBase extends Logging {
         "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
         "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
 
-    logInfo("Yarn AM launch context:")
-    logInfo(s"  user class: ${args.userClass}")
-    logInfo(s"  env:        $env")
-    logInfo(s"  command:    ${commands.mkString(" ")}")
-
     // TODO: it would be nicer to just make sure there are no null commands here
     val printableCommands = commands.map(s => if (s == null) "null" else s).toList
     amContainer.setCommands(printableCommands)
 
-    setupSecurityToken(amContainer)
+    logDebug("===============================================================================")
+    logDebug("Yarn AM launch context:")
+    logDebug(s"    user class: ${Option(args.userClass).getOrElse("N/A")}")
+    logDebug("    env:")
+    launchEnv.foreach { case (k, v) => logDebug(s"        $k -> $v") }
+    logDebug("    resources:")
+    localResources.foreach { case (k, v) => logDebug(s"        $k -> $v")}
+    logDebug("    command:")
+    logDebug(s"        ${printableCommands.mkString(" ")}")
+    logDebug("===============================================================================")
 
     // send the acl settings into YARN to control who has access via YARN interfaces
     val securityManager = new SecurityManager(sparkConf)
     amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
+    setupSecurityToken(amContainer)
+    UserGroupInformation.getCurrentUser().addCredentials(credentials)
 
     amContainer
   }
+
+  /**
+   * Report the state of an application until it has exited, either successfully or
+   * due to some failure, then return the application state.
+   *
+   * @param appId ID of the application to monitor.
+   * @param returnOnRunning Whether to also return the application state when it is RUNNING.
+   * @param logApplicationReport Whether to log details of the application report every iteration.
+   * @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING.
+   */
+  def monitorApplication(
+      appId: ApplicationId,
+      returnOnRunning: Boolean = false,
+      logApplicationReport: Boolean = true): YarnApplicationState = {
+    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
+    var lastState: YarnApplicationState = null
+    while (true) {
+      Thread.sleep(interval)
+      val report = getApplicationReport(appId)
+      val state = report.getYarnApplicationState
+
+      if (logApplicationReport) {
+        logInfo(s"Application report for $appId (state: $state)")
+        val details = Seq[(String, String)](
+          ("client token", getClientToken(report)),
+          ("diagnostics", report.getDiagnostics),
+          ("ApplicationMaster host", report.getHost),
+          ("ApplicationMaster RPC port", report.getRpcPort.toString),
+          ("queue", report.getQueue),
+          ("start time", report.getStartTime.toString),
+          ("final status", report.getFinalApplicationStatus.toString),
+          ("tracking URL", report.getTrackingUrl),
+          ("user", report.getUser)
+        )
+
+        // Use more loggable format if value is null or empty
+        val formattedDetails = details
+          .map { case (k, v) =>
+            val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
+            s"\n\t $k: $newValue" }
+          .mkString("")
+
+        // If DEBUG is enabled, log report details every iteration
+        // Otherwise, log them every time the application changes state
+        if (log.isDebugEnabled) {
+          logDebug(formattedDetails)
+        } else if (lastState != state) {
+          logInfo(formattedDetails)
+        }
+      }
+
+      if (state == YarnApplicationState.FINISHED ||
+        state == YarnApplicationState.FAILED ||
+        state == YarnApplicationState.KILLED) {
+        return state
+      }
+
+      if (returnOnRunning && state == YarnApplicationState.RUNNING) {
+        return state
+      }
+
+      lastState = state
+    }
+
+    // Never reached, but keeps compiler happy
+    throw new SparkException("While loop is depleted! This should never happen...")
+  }
+
+  /**
+   * Submit an application to the ResourceManager and monitor its state.
+   * This continues until the application has exited for any reason.
+   */
+  def run(): Unit = monitorApplication(submitApplication())
+
+  /* --------------------------------------------------------------------------------------- *
+   |  Methods that cannot be implemented here due to API differences across hadoop versions  |
+   * --------------------------------------------------------------------------------------- */
+
+  /** Submit an application running our ApplicationMaster to the ResourceManager. */
+  def submitApplication(): ApplicationId
+
+  /** Set up security tokens for launching our ApplicationMaster container. */
+  protected def setupSecurityToken(containerContext: ContainerLaunchContext): Unit
+
+  /** Get the application report from the ResourceManager for an application we have submitted. */
+  protected def getApplicationReport(appId: ApplicationId): ApplicationReport
+
+  /**
+   * Return the security token used by this client to communicate with the ApplicationMaster.
+   * If no security is enabled, the token returned by the report is null.
+   */
+  protected def getClientToken(report: ApplicationReport): String
 }
 
-object ClientBase extends Logging {
+private[spark] object ClientBase extends Logging {
+
+  // Alias for the Spark assembly jar and the user jar
   val SPARK_JAR: String = "__spark__.jar"
   val APP_JAR: String = "__app__.jar"
+
+  // URI scheme that identifies local resources
   val LOCAL_SCHEME = "local"
+
+  // Staging directory for any temporary jars or files
+  val SPARK_STAGING: String = ".sparkStaging"
+
+  // Location of any user-defined Spark jars
   val CONF_SPARK_JAR = "spark.yarn.jar"
-  /**
-   * This is an internal config used to propagate the location of the user's jar file to the
-   * driver/executors.
-   */
+  val ENV_SPARK_JAR = "SPARK_JAR"
+
+  // Internal config to propagate the location of the user's jar to the driver/executors
   val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
-  /**
-   * This is an internal config used to propagate the list of extra jars to add to the classpath
-   * of executors.
-   */
+
+  // Internal config to propagate the locations of any extra jars to add to the classpath
+  // of the executors
   val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
-  val ENV_SPARK_JAR = "SPARK_JAR"
+
+  // Staging directory is private! -> rwx--------
+  val STAGING_DIR_PERMISSION: FsPermission =
+    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
+
+  // App files are world-wide readable and owner writable -> rw-r--r--
+  val APP_FILE_PERMISSION: FsPermission =
+    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
 
   /**
    * Find the user-defined Spark jar if configured, or return the jar containing this
@@ -461,7 +540,7 @@ object ClientBase extends Logging {
    * This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the
    * user environment if that is not found (for backwards compatibility).
    */
-  def sparkJar(conf: SparkConf) = {
+  private def sparkJar(conf: SparkConf): String = {
     if (conf.contains(CONF_SPARK_JAR)) {
       conf.get(CONF_SPARK_JAR)
     } else if (System.getenv(ENV_SPARK_JAR) != null) {
@@ -474,16 +553,22 @@ object ClientBase extends Logging {
     }
   }
 
-  def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = {
+  /**
+   * Return the path to the given application's staging directory.
+   */
+  private def getAppStagingDir(appId: ApplicationId): String = {
+    SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
+  }
+
+  /**
+   * Populate the classpath entry in the given environment map with any application
+   * classpath specified through the Hadoop and Yarn configurations.
+   */
+  def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]): Unit = {
     val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
     for (c <- classPathElementsToAdd.flatten) {
-      YarnSparkHadoopUtil.addToEnvironment(
-        env,
-        Environment.CLASSPATH.name,
-        c.trim,
-        File.pathSeparator)
+      YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim)
     }
-    classPathElementsToAdd
   }
 
   private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
@@ -519,7 +604,7 @@ object ClientBase extends Logging {
 
   /**
    * In Hadoop 0.23, the MR application classpath comes with the YARN application
-   * classpath.  In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String.
+   * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String.
    * So we need to use reflection to retrieve it.
    */
   def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
@@ -545,8 +630,16 @@ object ClientBase extends Logging {
     triedDefault.toOption
   }
 
-  def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf,
-      env: HashMap[String, String], extraClassPath: Option[String] = None) {
+  /**
+   * Populate the classpath entry in the given environment map.
+   * This includes the user jar, Spark jar, and any extra application jars.
+   */
+  def populateClasspath(
+      args: ClientArguments,
+      conf: Configuration,
+      sparkConf: SparkConf,
+      env: HashMap[String, String],
+      extraClassPath: Option[String] = None): Unit = {
     extraClassPath.foreach(addClasspathEntry(_, env))
     addClasspathEntry(Environment.PWD.$(), env)
 
@@ -554,36 +647,40 @@ object ClientBase extends Logging {
     if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) {
       addUserClasspath(args, sparkConf, env)
       addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
-      ClientBase.populateHadoopClasspath(conf, env)
+      populateHadoopClasspath(conf, env)
     } else {
       addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env)
-      ClientBase.populateHadoopClasspath(conf, env)
+      populateHadoopClasspath(conf, env)
       addUserClasspath(args, sparkConf, env)
     }
 
     // Append all jar files under the working directory to the classpath.
-    addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env);
+    addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env)
   }
 
   /**
    * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly
    * to the classpath.
    */
-  private def addUserClasspath(args: ClientArguments, conf: SparkConf,
-      env: HashMap[String, String]) = {
-    if (args != null) {
-      addFileToClasspath(args.userJar, APP_JAR, env)
-      if (args.addJars != null) {
-        args.addJars.split(",").foreach { case file: String =>
-          addFileToClasspath(file, null, env)
-        }
+  private def addUserClasspath(
+      args: ClientArguments,
+      conf: SparkConf,
+      env: HashMap[String, String]): Unit = {
+
+    // If `args` is not null, we are launching an AM container.
+    // Otherwise, we are launching executor containers.
+    val (mainJar, secondaryJars) =
+      if (args != null) {
+        (args.userJar, args.addJars)
+      } else {
+        (conf.get(CONF_SPARK_USER_JAR, null), conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null))
       }
-    } else {
-      val userJar = conf.get(CONF_SPARK_USER_JAR, null)
-      addFileToClasspath(userJar, APP_JAR, env)
 
-      val cachedSecondaryJarLinks = conf.get(CONF_SPARK_YARN_SECONDARY_JARS, "").split(",")
-      cachedSecondaryJarLinks.foreach(jar => addFileToClasspath(jar, null, env))
+    addFileToClasspath(mainJar, APP_JAR, env)
+    if (secondaryJars != null) {
+      secondaryJars.split(",").filter(_.nonEmpty).foreach { jar =>
+        addFileToClasspath(jar, null, env)
+      }
     }
   }
 
@@ -599,46 +696,44 @@ object ClientBase extends Logging {
    * @param fileName  Alternate name for the file (optional).
    * @param env       Map holding the environment variables.
    */
-  private def addFileToClasspath(path: String, fileName: String,
-      env: HashMap[String, String]) : Unit = {
+  private def addFileToClasspath(
+      path: String,
+      fileName: String,
+      env: HashMap[String, String]): Unit = {
     if (path != null) {
       scala.util.control.Exception.ignoring(classOf[URISyntaxException]) {
-        val localPath = getLocalPath(path)
-        if (localPath != null) {
-          addClasspathEntry(localPath, env)
+        val uri = new URI(path)
+        if (uri.getScheme == LOCAL_SCHEME) {
+          addClasspathEntry(uri.getPath, env)
           return
         }
       }
     }
     if (fileName != null) {
-      addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env);
+      addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env)
     }
   }
 
   /**
-   * Returns the local path if the URI is a "local:" URI, or null otherwise.
+   * Add the given path to the classpath entry of the given environment map.
+   * If the classpath is already set, this appends the new path to the existing classpath.
    */
-  private def getLocalPath(resource: String): String = {
-    val uri = new URI(resource)
-    if (LOCAL_SCHEME.equals(uri.getScheme())) {
-      return uri.getPath()
-    }
-    null
-  }
-
-  private def addClasspathEntry(path: String, env: HashMap[String, String]) =
-    YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path,
-            File.pathSeparator)
+  private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit =
+    YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
 
   /**
    * Get the list of namenodes the user may access.
    */
-  private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
-    sparkConf.get("spark.yarn.access.namenodes", "").split(",").map(_.trim()).filter(!_.isEmpty)
-      .map(new Path(_)).toSet
+  def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+    sparkConf.get("spark.yarn.access.namenodes", "")
+      .split(",")
+      .map(_.trim())
+      .filter(!_.isEmpty)
+      .map(new Path(_))
+      .toSet
   }
 
-  private[yarn] def getTokenRenewer(conf: Configuration): String = {
+  def getTokenRenewer(conf: Configuration): String = {
     val delegTokenRenewer = Master.getMasterPrincipal(conf)
     logDebug("delegation token renewer is: " + delegTokenRenewer)
     if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
@@ -652,17 +747,54 @@ object ClientBase extends Logging {
   /**
    * Obtains tokens for the namenodes passed in and adds them to the credentials.
    */
-  private[yarn] def obtainTokensForNamenodes(paths: Set[Path], conf: Configuration,
-    creds: Credentials) {
+  def obtainTokensForNamenodes(
+      paths: Set[Path],
+      conf: Configuration,
+      creds: Credentials): Unit = {
     if (UserGroupInformation.isSecurityEnabled()) {
       val delegTokenRenewer = getTokenRenewer(conf)
+      paths.foreach { dst =>
+        val dstFs = dst.getFileSystem(conf)
+        logDebug("getting token for namenode: " + dst)
+        dstFs.addDelegationTokens(delegTokenRenewer, creds)
+      }
+    }
+  }
 
-      paths.foreach {
-        dst =>
-          val dstFs = dst.getFileSystem(conf)
-          logDebug("getting token for namenode: " + dst)
-          dstFs.addDelegationTokens(delegTokenRenewer, creds)
+  /**
+   * Return whether the two file systems are the same.
+   */
+  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+    val srcUri = srcFs.getUri()
+    val dstUri = destFs.getUri()
+    if (srcUri.getScheme() == null) {
+      return false
+    }
+    if (!srcUri.getScheme().equals(dstUri.getScheme())) {
+      return false
+    }
+    var srcHost = srcUri.getHost()
+    var dstHost = dstUri.getHost()
+    if ((srcHost != null) && (dstHost != null)) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
+      } catch {
+        case e: UnknownHostException =>
+          return false
       }
+      if (!srcHost.equals(dstHost)) {
+        return false
+      }
+    } else if (srcHost == null && dstHost != null) {
+      return false
+    } else if (srcHost != null && dstHost == null) {
+      return false
+    }
+    if (srcUri.getPort() != dstUri.getPort()) {
+      false
+    } else {
+      true
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 9b7f1fc..c592ecf 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -19,29 +19,24 @@ package org.apache.spark.deploy.yarn
 
 import java.net.URI
 
+import scala.collection.mutable.{HashMap, LinkedHashMap, Map}
+
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.fs.permission.FsAction
-import org.apache.hadoop.yarn.api.records.LocalResource
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility
-import org.apache.hadoop.yarn.api.records.LocalResourceType
+import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.util.{Records, ConverterUtils}
 
-import org.apache.spark.Logging 
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.LinkedHashMap
-import scala.collection.mutable.Map
-
+import org.apache.spark.Logging
 
 /** Client side methods to setup the Hadoop distributed cache */
-class ClientDistributedCacheManager() extends Logging {
-  private val distCacheFiles: Map[String, Tuple3[String, String, String]] = 
-    LinkedHashMap[String, Tuple3[String, String, String]]()
-  private val distCacheArchives: Map[String, Tuple3[String, String, String]] = 
-    LinkedHashMap[String, Tuple3[String, String, String]]()
+private[spark] class ClientDistributedCacheManager() extends Logging {
+
+  // Mappings from remote URI to (file status, modification time, visibility)
+  private val distCacheFiles: Map[String, (String, String, String)] =
+    LinkedHashMap[String, (String, String, String)]()
+  private val distCacheArchives: Map[String, (String, String, String)] =
+    LinkedHashMap[String, (String, String, String)]()
 
 
   /**
@@ -68,9 +63,9 @@ class ClientDistributedCacheManager() extends Logging {
       resourceType: LocalResourceType,
       link: String,
       statCache: Map[URI, FileStatus],
-      appMasterOnly: Boolean = false) = {
+      appMasterOnly: Boolean = false): Unit = {
     val destStatus = fs.getFileStatus(destPath)
-    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    val amJarRsrc = Records.newRecord(classOf[LocalResource])
     amJarRsrc.setType(resourceType)
     val visibility = getVisibility(conf, destPath.toUri(), statCache)
     amJarRsrc.setVisibility(visibility)
@@ -80,7 +75,7 @@ class ClientDistributedCacheManager() extends Logging {
     if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name")
     localResources(link) = amJarRsrc
     
-    if (appMasterOnly == false) {
+    if (!appMasterOnly) {
       val uri = destPath.toUri()
       val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
       if (resourceType == LocalResourceType.FILE) {
@@ -95,12 +90,10 @@ class ClientDistributedCacheManager() extends Logging {
 
   /**
    * Adds the necessary cache file env variables to the env passed in
-   * @param env
    */
-  def setDistFilesEnv(env: Map[String, String]) = {
+  def setDistFilesEnv(env: Map[String, String]): Unit = {
     val (keys, tupleValues) = distCacheFiles.unzip
     val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
     if (keys.size > 0) {
       env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
       env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = 
@@ -114,12 +107,10 @@ class ClientDistributedCacheManager() extends Logging {
 
   /**
    * Adds the necessary cache archive env variables to the env passed in
-   * @param env
    */
-  def setDistArchivesEnv(env: Map[String, String]) = {
+  def setDistArchivesEnv(env: Map[String, String]): Unit = {
     val (keys, tupleValues) = distCacheArchives.unzip
     val (sizes, timeStamps, visibilities) = tupleValues.unzip3
-
     if (keys.size > 0) {
       env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n }
       env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = 
@@ -133,25 +124,21 @@ class ClientDistributedCacheManager() extends Logging {
 
   /**
    * Returns the local resource visibility depending on the cache file permissions
-   * @param conf
-   * @param uri
-   * @param statCache
    * @return LocalResourceVisibility
    */
-  def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]):
-      LocalResourceVisibility = {
+  def getVisibility(
+      conf: Configuration,
+      uri: URI,
+      statCache: Map[URI, FileStatus]): LocalResourceVisibility = {
     if (isPublic(conf, uri, statCache)) {
-      return LocalResourceVisibility.PUBLIC 
-    } 
-    LocalResourceVisibility.PRIVATE
+      LocalResourceVisibility.PUBLIC
+    } else {
+      LocalResourceVisibility.PRIVATE
+    }
   }
 
   /**
-   * Returns a boolean to denote whether a cache file is visible to all(public)
-   * or not
-   * @param conf
-   * @param uri
-   * @param statCache
+   * Returns a boolean to denote whether a cache file is visible to all (public)
    * @return true if the path in the uri is visible to all, false otherwise
    */
   def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
@@ -167,13 +154,12 @@ class ClientDistributedCacheManager() extends Logging {
   /**
    * Returns true if all ancestors of the specified path have the 'execute'
    * permission set for all users (i.e. that other users can traverse
-   * the directory heirarchy to the given path)
-   * @param fs
-   * @param path
-   * @param statCache
+   * the directory hierarchy to the given path)
    * @return true if all ancestors have the 'execute' permission set for all users
    */
-  def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path, 
+  def ancestorsHaveExecutePermissions(
+      fs: FileSystem,
+      path: Path,
       statCache: Map[URI, FileStatus]): Boolean =  {
     var current = path
     while (current != null) {
@@ -187,32 +173,25 @@ class ClientDistributedCacheManager() extends Logging {
   }
 
   /**
-   * Checks for a given path whether the Other permissions on it 
+   * Checks for a given path whether the Other permissions on it
    * imply the permission in the passed FsAction
-   * @param fs
-   * @param path
-   * @param action
-   * @param statCache
    * @return true if the path in the uri is visible to all, false otherwise
    */
-  def checkPermissionOfOther(fs: FileSystem, path: Path,
-      action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
+  def checkPermissionOfOther(
+      fs: FileSystem,
+      path: Path,
+      action: FsAction,
+      statCache: Map[URI, FileStatus]): Boolean = {
     val status = getFileStatus(fs, path.toUri(), statCache)
     val perms = status.getPermission()
     val otherAction = perms.getOtherAction()
-    if (otherAction.implies(action)) {
-      return true
-    }
-    false
+    otherAction.implies(action)
   }
 
   /**
-   * Checks to see if the given uri exists in the cache, if it does it 
+   * Checks to see if the given uri exists in the cache, if it does it
    * returns the existing FileStatus, otherwise it stats the uri, stores
    * it in the cache, and returns the FileStatus.
-   * @param fs
-   * @param uri
-   * @param statCache
    * @return FileStatus
    */
   def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index f56f72c..bbbf615 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.File
 import java.net.URI
 
 import scala.collection.JavaConversions._
@@ -128,9 +127,9 @@ trait ExecutorRunnableUtil extends Logging {
       localResources: HashMap[String, LocalResource],
       timestamp: String,
       size: String,
-      vis: String) = {
+      vis: String): Unit = {
     val uri = new URI(file)
-    val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
+    val amJarRsrc = Records.newRecord(classOf[LocalResource])
     amJarRsrc.setType(rtype)
     amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis))
     amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri))
@@ -175,14 +174,17 @@ trait ExecutorRunnableUtil extends Logging {
     ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp)
 
     sparkConf.getExecutorEnv.foreach { case (key, value) =>
-      YarnSparkHadoopUtil.addToEnvironment(env, key, value, File.pathSeparator)
+      // This assumes each executor environment variable set here is a path
+      // This is kept for backward compatibility and consistency with hadoop
+      YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
     }
 
     // Keep this for backwards compatibility but users should move to the config
-    YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
-      File.pathSeparator)
+    sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
+      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
+    }
 
-    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
+    System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
     env
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4a33e34..0b712c2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.yarn
 
 import java.lang.{Boolean => JBoolean}
+import java.io.File
 import java.util.{Collections, Set => JSet}
 import java.util.regex.Matcher
 import java.util.regex.Pattern
@@ -29,14 +30,12 @@ import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.util.StringInterner
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType
 import org.apache.hadoop.yarn.util.RackResolver
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
@@ -100,30 +99,26 @@ object YarnSparkHadoopUtil {
   private val hostToRack = new ConcurrentHashMap[String, String]()
   private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
 
-  def addToEnvironment(
-      env: HashMap[String, String],
-      variable: String,
-      value: String,
-      classPathSeparator: String) = {
-    var envVariable = ""
-    if (env.get(variable) == None) {
-      envVariable = value
-    } else {
-      envVariable = env.get(variable).get + classPathSeparator + value
-    }
-    env put (StringInterner.weakIntern(variable), StringInterner.weakIntern(envVariable))
+  /**
+   * Add a path variable to the given environment map.
+   * If the map already contains this key, append the value to the existing value instead.
+   */
+  def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
+    val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value
+    env.put(key, newValue)
   }
 
-  def setEnvFromInputString(
-      env: HashMap[String, String],
-      envString: String,
-      classPathSeparator: String) = {
-    if (envString != null && envString.length() > 0) {
-      var childEnvs = envString.split(",")
-      var p = Pattern.compile(getEnvironmentVariableRegex())
+  /**
+   * Set zero or more environment variables specified by the given input string.
+   * The input string is expected to take the form "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3".
+   */
+  def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = {
+    if (inputString != null && inputString.length() > 0) {
+      val childEnvs = inputString.split(",")
+      val p = Pattern.compile(environmentVariableRegex)
       for (cEnv <- childEnvs) {
-        var parts = cEnv.split("=") // split on '='
-        var m = p.matcher(parts(1))
+        val parts = cEnv.split("=") // split on '='
+        val m = p.matcher(parts(1))
         val sb = new StringBuffer
         while (m.find()) {
           val variable = m.group(1)
@@ -131,8 +126,7 @@ object YarnSparkHadoopUtil {
           if (env.get(variable) != None) {
             replace = env.get(variable).get
           } else {
-            // if this key is not configured for the child .. get it
-            // from the env
+            // if this key is not configured for the child .. get it from the env
             replace = System.getenv(variable)
             if (replace == null) {
             // the env key is note present anywhere .. simply set it
@@ -142,14 +136,15 @@ object YarnSparkHadoopUtil {
           m.appendReplacement(sb, Matcher.quoteReplacement(replace))
         }
         m.appendTail(sb)
-        addToEnvironment(env, parts(0), sb.toString(), classPathSeparator)
+        // This treats the environment variable as path variable delimited by `File.pathSeparator`
+        // This is kept for backward compatibility and consistency with Hadoop's behavior
+        addPathToEnvironment(env, parts(0), sb.toString)
       }
     }
   }
 
-  private def getEnvironmentVariableRegex() : String = {
-    val osName = System.getProperty("os.name")
-    if (osName startsWith "Windows") {
+  private val environmentVariableRegex: String = {
+    if (Utils.isWindows) {
       "%([A-Za-z_][A-Za-z0-9_]*?)%"
     } else {
       "\\$([A-Za-z_][A-Za-z0-9_]*)"
@@ -181,14 +176,14 @@ object YarnSparkHadoopUtil {
     }
   }
 
-  private[spark] def lookupRack(conf: Configuration, host: String): String = {
+  def lookupRack(conf: Configuration, host: String): String = {
     if (!hostToRack.contains(host)) {
       populateRackInfo(conf, host)
     }
     hostToRack.get(host)
   }
 
-  private[spark] def populateRackInfo(conf: Configuration, hostname: String) {
+  def populateRackInfo(conf: Configuration, hostname: String) {
     Utils.checkHost(hostname)
 
     if (!hostToRack.containsKey(hostname)) {
@@ -212,8 +207,8 @@ object YarnSparkHadoopUtil {
     }
   }
 
-  private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager):
-      Map[ApplicationAccessType, String] = {
+  def getApplicationAclsForYarn(securityMgr: SecurityManager)
+      : Map[ApplicationAccessType, String] = {
     Map[ApplicationAccessType, String] (
       ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
       ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 6aa6475..200a308 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
 
 import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
 import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
 import scala.collection.mutable.ArrayBuffer
@@ -34,115 +34,120 @@ private[spark] class YarnClientSchedulerBackend(
     minRegisteredRatio = 0.8
   }
 
-  var client: Client = null
-  var appId: ApplicationId = null
-  var checkerThread: Thread = null
-  var stopping: Boolean = false
-  var totalExpectedExecutors = 0
-
-  private[spark] def addArg(optionName: String, envVar: String, sysProp: String,
-      arrayBuf: ArrayBuffer[String]) {
-    if (System.getenv(envVar) != null) {
-      arrayBuf += (optionName, System.getenv(envVar))
-    } else if (sc.getConf.contains(sysProp)) {
-      arrayBuf += (optionName, sc.getConf.get(sysProp))
-    }
-  }
+  private var client: Client = null
+  private var appId: ApplicationId = null
+  private var stopping: Boolean = false
+  private var totalExpectedExecutors = 0
 
+  /**
+   * Create a Yarn client to submit an application to the ResourceManager.
+   * This waits until the application is running.
+   */
   override def start() {
     super.start()
-
     val driverHost = conf.get("spark.driver.host")
     val driverPort = conf.get("spark.driver.port")
     val hostport = driverHost + ":" + driverPort
     sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIHostPort) }
 
     val argsArrayBuf = new ArrayBuffer[String]()
-    argsArrayBuf += (
-      "--args", hostport
-    )
-
-    // process any optional arguments, given either as environment variables
-    // or system properties. use the defaults already defined in ClientArguments
-    // if things aren't specified. system properties override environment
-    // variables.
-    List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
-      ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
-      ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
-      ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
-      ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
-      ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
-      ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
-      ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
-      ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
-      ("--name", "SPARK_YARN_APP_NAME", "spark.app.name"))
-    .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) }
-
-    logDebug("ClientArguments called with: " + argsArrayBuf)
+    argsArrayBuf += ("--arg", hostport)
+    argsArrayBuf ++= getExtraClientArguments
+
+    logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
     val args = new ClientArguments(argsArrayBuf.toArray, conf)
     totalExpectedExecutors = args.numExecutors
     client = new Client(args, conf)
-    appId = client.runApp()
-    waitForApp()
-    checkerThread = yarnApplicationStateCheckerThread()
+    appId = client.submitApplication()
+    waitForApplication()
+    asyncMonitorApplication()
   }
 
-  def waitForApp() {
-
-    // TODO : need a better way to find out whether the executors are ready or not
-    // maybe by resource usage report?
-    while(true) {
-      val report = client.getApplicationReport(appId)
-
-      logInfo("Application report from ASM: \n" +
-        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
-        "\t appStartTime: " + report.getStartTime() + "\n" +
-        "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
+  /**
+   * Return any extra command line arguments to be passed to Client provided in the form of
+   * environment variables or Spark properties.
+   */
+  private def getExtraClientArguments: Seq[String] = {
+    val extraArgs = new ArrayBuffer[String]
+    val optionTuples = // List of (target Client argument, environment variable, Spark property)
+      List(
+        ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
+        ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
+        ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
+        ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
+        ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
+        ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"),
+        ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"),
+        ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"),
+        ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"),
+        ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")
       )
-
-      // Ready to go, or already gone.
-      val state = report.getYarnApplicationState()
-      if (state == YarnApplicationState.RUNNING) {
-        return
-      } else if (state == YarnApplicationState.FINISHED ||
-        state == YarnApplicationState.FAILED ||
-        state == YarnApplicationState.KILLED) {
-        throw new SparkException("Yarn application already ended," +
-          "might be killed or not able to launch application master.")
+    optionTuples.foreach { case (optionName, envVar, sparkProp) =>
+      if (System.getenv(envVar) != null) {
+        extraArgs += (optionName, System.getenv(envVar))
+      } else if (sc.getConf.contains(sparkProp)) {
+        extraArgs += (optionName, sc.getConf.get(sparkProp))
       }
+    }
+    extraArgs
+  }
 
-      Thread.sleep(1000)
+  /**
+   * Report the state of the application until it is running.
+   * If the application has finished, failed or been killed in the process, throw an exception.
+   * This assumes both `client` and `appId` have already been set.
+   */
+  private def waitForApplication(): Unit = {
+    assert(client != null && appId != null, "Application has not been submitted yet!")
+    val state = client.monitorApplication(appId, returnOnRunning = true) // blocking
+    if (state == YarnApplicationState.FINISHED ||
+      state == YarnApplicationState.FAILED ||
+      state == YarnApplicationState.KILLED) {
+      throw new SparkException("Yarn application has already ended! " +
+        "It might have been killed or unable to launch application master.")
+    }
+    if (state == YarnApplicationState.RUNNING) {
+      logInfo(s"Application $appId has started running.")
     }
   }
 
-  private def yarnApplicationStateCheckerThread(): Thread = {
+  /**
+   * Monitor the application state in a separate thread.
+   * If the application has exited for any reason, stop the SparkContext.
+   * This assumes both `client` and `appId` have already been set.
+   */
+  private def asyncMonitorApplication(): Unit = {
+    assert(client != null && appId != null, "Application has not been submitted yet!")
     val t = new Thread {
       override def run() {
         while (!stopping) {
           val report = client.getApplicationReport(appId)
           val state = report.getYarnApplicationState()
-          if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED
-            || state == YarnApplicationState.FAILED) {
-            logError(s"Yarn application already ended: $state")
+          if (state == YarnApplicationState.FINISHED ||
+            state == YarnApplicationState.KILLED ||
+            state == YarnApplicationState.FAILED) {
+            logError(s"Yarn application has already exited with state $state!")
             sc.stop()
             stopping = true
           }
           Thread.sleep(1000L)
         }
-        checkerThread = null
         Thread.currentThread().interrupt()
       }
     }
-    t.setName("Yarn Application State Checker")
+    t.setName("Yarn application state monitor")
     t.setDaemon(true)
     t.start()
-    t
   }
 
+  /**
+   * Stop the scheduler. This assumes `start()` has already been called.
+   */
   override def stop() {
+    assert(client != null, "Attempted to stop this scheduler before starting it!")
     stopping = true
     super.stop()
-    client.stop
+    client.stop()
     logInfo("Stopped")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c4022dd5/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
index c3b7a2c..9bd9161 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
+import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -90,7 +90,7 @@ class ClientBaseSuite extends FunSuite with Matchers {
     val env = new MutableHashMap[String, String]()
     val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
 
-    ClientBase.populateClasspath(args, conf, sparkConf, env, None)
+    ClientBase.populateClasspath(args, conf, sparkConf, env)
 
     val cp = env("CLASSPATH").split(File.pathSeparator)
     s"$SPARK,$USER,$ADDED".split(",").foreach({ entry =>
@@ -114,10 +114,10 @@ class ClientBaseSuite extends FunSuite with Matchers {
     val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
 
     val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
-    doReturn(new Path("/")).when(client).copyRemoteFile(any(classOf[Path]),
+    doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
       any(classOf[Path]), anyShort(), anyBoolean())
 
-    var tempDir = Files.createTempDir();
+    val tempDir = Files.createTempDir()
     try {
       client.prepareLocalResources(tempDir.getAbsolutePath())
       sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER))
@@ -247,13 +247,13 @@ class ClientBaseSuite extends FunSuite with Matchers {
 
   private class DummyClient(
       val args: ClientArguments,
-      val conf: Configuration,
+      val hadoopConf: Configuration,
       val sparkConf: SparkConf,
       val yarnConf: YarnConfiguration) extends ClientBase {
-
-    override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit =
-      throw new UnsupportedOperationException()
-
+    override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ???
+    override def submitApplication(): ApplicationId = ???
+    override def getApplicationReport(appId: ApplicationId): ApplicationReport = ???
+    override def getClientToken(report: ApplicationReport): String = ???
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org