You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/12/07 00:23:48 UTC

[11/18] spark git commit: [SPARK-18662] Move resource managers to separate directory

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
new file mode 100644
index 0000000..be419ce
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -0,0 +1,1541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
+import java.net.{InetAddress, UnknownHostException, URI}
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.util.{Properties, UUID}
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import com.google.common.base.Objects
+import com.google.common.io.Files
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs._
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.mapreduce.MRJobConfig
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.util.StringUtils
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.protocolrecords._
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
+import org.apache.hadoop.yarn.util.Records
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
+import org.apache.spark.util.{CallerContext, Utils}
+
+private[spark] class Client(
+    val args: ClientArguments,
+    val hadoopConf: Configuration,
+    val sparkConf: SparkConf)
+  extends Logging {
+
+  import Client._
+  import YarnSparkHadoopUtil._
+
+  def this(clientArgs: ClientArguments, spConf: SparkConf) =
+    this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
+
+  private val yarnClient = YarnClient.createYarnClient
+  private val yarnConf = new YarnConfiguration(hadoopConf)
+
+  private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"
+
+  // AM related configurations
+  private val amMemory = if (isClusterMode) {
+    sparkConf.get(DRIVER_MEMORY).toInt
+  } else {
+    sparkConf.get(AM_MEMORY).toInt
+  }
+  private val amMemoryOverhead = {
+    val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
+    sparkConf.get(amMemoryOverheadEntry).getOrElse(
+      math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
+  }
+  private val amCores = if (isClusterMode) {
+    sparkConf.get(DRIVER_CORES)
+  } else {
+    sparkConf.get(AM_CORES)
+  }
+
+  // Executor related configurations
+  private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
+  private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
+    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
+
+  private val distCacheMgr = new ClientDistributedCacheManager()
+
+  private var loginFromKeytab = false
+  private var principal: String = null
+  private var keytab: String = null
+  private var credentials: Credentials = null
+
+  private val launcherBackend = new LauncherBackend() {
+    override def onStopRequest(): Unit = {
+      if (isClusterMode && appId != null) {
+        yarnClient.killApplication(appId)
+      } else {
+        setState(SparkAppHandle.State.KILLED)
+        stop()
+      }
+    }
+  }
+  private val fireAndForget = isClusterMode && !sparkConf.get(WAIT_FOR_APP_COMPLETION)
+
+  private var appId: ApplicationId = null
+
+  // The app staging dir based on the STAGING_DIR configuration if configured
+  // otherwise based on the users home directory.
+  private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
+    .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
+
+  private val credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+
+  def reportLauncherState(state: SparkAppHandle.State): Unit = {
+    launcherBackend.setState(state)
+  }
+
+  def stop(): Unit = {
+    launcherBackend.close()
+    yarnClient.stop()
+    // Unset YARN mode system env variable, to allow switching between cluster types.
+    System.clearProperty("SPARK_YARN_MODE")
+  }
+
+  /**
+   * Submit an application running our ApplicationMaster to the ResourceManager.
+   *
+   * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
+   * creating applications and setting up the application submission context. This was not
+   * available in the alpha API.
+   */
+  def submitApplication(): ApplicationId = {
+    var appId: ApplicationId = null
+    try {
+      launcherBackend.connect()
+      // Setup the credentials before doing anything else,
+      // so we have don't have issues at any point.
+      setupCredentials()
+      yarnClient.init(yarnConf)
+      yarnClient.start()
+
+      logInfo("Requesting a new application from cluster with %d NodeManagers"
+        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
+
+      // Get a new application from our RM
+      val newApp = yarnClient.createApplication()
+      val newAppResponse = newApp.getNewApplicationResponse()
+      appId = newAppResponse.getApplicationId()
+      reportLauncherState(SparkAppHandle.State.SUBMITTED)
+      launcherBackend.setAppId(appId.toString)
+
+      new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
+        Option(appId.toString)).setCurrentContext()
+
+      // Verify whether the cluster has enough resources for our AM
+      verifyClusterResources(newAppResponse)
+
+      // Set up the appropriate contexts to launch our AM
+      val containerContext = createContainerLaunchContext(newAppResponse)
+      val appContext = createApplicationSubmissionContext(newApp, containerContext)
+
+      // Finally, submit and monitor the application
+      logInfo(s"Submitting application $appId to ResourceManager")
+      yarnClient.submitApplication(appContext)
+      appId
+    } catch {
+      case e: Throwable =>
+        if (appId != null) {
+          cleanupStagingDir(appId)
+        }
+        throw e
+    }
+  }
+
+  /**
+   * Cleanup application staging directory.
+   */
+  private def cleanupStagingDir(appId: ApplicationId): Unit = {
+    val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
+    try {
+      val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
+      val fs = stagingDirPath.getFileSystem(hadoopConf)
+      if (!preserveFiles && fs.delete(stagingDirPath, true)) {
+        logInfo(s"Deleted staging directory $stagingDirPath")
+      }
+    } catch {
+      case ioe: IOException =>
+        logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
+    }
+  }
+
+  /**
+   * Set up the context for submitting our ApplicationMaster.
+   * This uses the YarnClientApplication not available in the Yarn alpha API.
+   */
+  def createApplicationSubmissionContext(
+      newApp: YarnClientApplication,
+      containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
+    val appContext = newApp.getApplicationSubmissionContext
+    appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
+    appContext.setQueue(sparkConf.get(QUEUE_NAME))
+    appContext.setAMContainerSpec(containerContext)
+    appContext.setApplicationType("SPARK")
+
+    sparkConf.get(APPLICATION_TAGS).foreach { tags =>
+      try {
+        // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
+        // reflection to set it, printing a warning if a tag was specified but the YARN version
+        // doesn't support it.
+        val method = appContext.getClass().getMethod(
+          "setApplicationTags", classOf[java.util.Set[String]])
+        method.invoke(appContext, new java.util.HashSet[String](tags.asJava))
+      } catch {
+        case e: NoSuchMethodException =>
+          logWarning(s"Ignoring ${APPLICATION_TAGS.key} because this version of " +
+            "YARN does not support it")
+      }
+    }
+    sparkConf.get(MAX_APP_ATTEMPTS) match {
+      case Some(v) => appContext.setMaxAppAttempts(v)
+      case None => logDebug(s"${MAX_APP_ATTEMPTS.key} is not set. " +
+          "Cluster's default value will be used.")
+    }
+
+    sparkConf.get(AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).foreach { interval =>
+      try {
+        val method = appContext.getClass().getMethod(
+          "setAttemptFailuresValidityInterval", classOf[Long])
+        method.invoke(appContext, interval: java.lang.Long)
+      } catch {
+        case e: NoSuchMethodException =>
+          logWarning(s"Ignoring ${AM_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key} because " +
+            "the version of YARN does not support it")
+      }
+    }
+
+    val capability = Records.newRecord(classOf[Resource])
+    capability.setMemory(amMemory + amMemoryOverhead)
+    capability.setVirtualCores(amCores)
+
+    sparkConf.get(AM_NODE_LABEL_EXPRESSION) match {
+      case Some(expr) =>
+        try {
+          val amRequest = Records.newRecord(classOf[ResourceRequest])
+          amRequest.setResourceName(ResourceRequest.ANY)
+          amRequest.setPriority(Priority.newInstance(0))
+          amRequest.setCapability(capability)
+          amRequest.setNumContainers(1)
+          val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
+          method.invoke(amRequest, expr)
+
+          val setResourceRequestMethod =
+            appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
+          setResourceRequestMethod.invoke(appContext, amRequest)
+        } catch {
+          case e: NoSuchMethodException =>
+            logWarning(s"Ignoring ${AM_NODE_LABEL_EXPRESSION.key} because the version " +
+              "of YARN does not support it")
+            appContext.setResource(capability)
+        }
+      case None =>
+        appContext.setResource(capability)
+    }
+
+    sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
+      try {
+        val logAggregationContext = Records.newRecord(
+          Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
+          .asInstanceOf[Object]
+
+        val setRolledLogsIncludePatternMethod =
+          logAggregationContext.getClass.getMethod("setRolledLogsIncludePattern", classOf[String])
+        setRolledLogsIncludePatternMethod.invoke(logAggregationContext, includePattern)
+
+        sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
+          val setRolledLogsExcludePatternMethod =
+            logAggregationContext.getClass.getMethod("setRolledLogsExcludePattern", classOf[String])
+          setRolledLogsExcludePatternMethod.invoke(logAggregationContext, excludePattern)
+        }
+
+        val setLogAggregationContextMethod =
+          appContext.getClass.getMethod("setLogAggregationContext",
+            Utils.classForName("org.apache.hadoop.yarn.api.records.LogAggregationContext"))
+        setLogAggregationContextMethod.invoke(appContext, logAggregationContext)
+      } catch {
+        case NonFatal(e) =>
+          logWarning(s"Ignoring ${ROLLED_LOG_INCLUDE_PATTERN.key} because the version of YARN " +
+            s"does not support it", e)
+      }
+    }
+
+    appContext
+  }
+
+  /** Set up security tokens for launching our ApplicationMaster container. */
+  private def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = {
+    val dob = new DataOutputBuffer
+    credentials.writeTokenStorageToStream(dob)
+    amContainer.setTokens(ByteBuffer.wrap(dob.getData))
+  }
+
+  /** Get the application report from the ResourceManager for an application we have submitted. */
+  def getApplicationReport(appId: ApplicationId): ApplicationReport =
+    yarnClient.getApplicationReport(appId)
+
+  /**
+   * Return the security token used by this client to communicate with the ApplicationMaster.
+   * If no security is enabled, the token returned by the report is null.
+   */
+  private def getClientToken(report: ApplicationReport): String =
+    Option(report.getClientToAMToken).map(_.toString).getOrElse("")
+
+  /**
+   * Fail fast if we have requested more resources per container than is available in the cluster.
+   */
+  private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
+    val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
+    logInfo("Verifying our application has not requested more than the maximum " +
+      s"memory capability of the cluster ($maxMem MB per container)")
+    val executorMem = executorMemory + executorMemoryOverhead
+    if (executorMem > maxMem) {
+      throw new IllegalArgumentException(s"Required executor memory ($executorMemory" +
+        s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
+        "Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or " +
+        "'yarn.nodemanager.resource.memory-mb'.")
+    }
+    val amMem = amMemory + amMemoryOverhead
+    if (amMem > maxMem) {
+      throw new IllegalArgumentException(s"Required AM memory ($amMemory" +
+        s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster! " +
+        "Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.")
+    }
+    logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
+      amMem,
+      amMemoryOverhead))
+
+    // We could add checks to make sure the entire cluster has enough resources but that involves
+    // getting all the node reports and computing ourselves.
+  }
+
+  /**
+   * Copy the given file to a remote file system (e.g. HDFS) if needed.
+   * The file is only copied if the source and destination file systems are different. This is used
+   * for preparing resources for launching the ApplicationMaster container. Exposed for testing.
+   */
+  private[yarn] def copyFileToRemote(
+      destDir: Path,
+      srcPath: Path,
+      replication: Short,
+      force: Boolean = false,
+      destName: Option[String] = None): Path = {
+    val destFs = destDir.getFileSystem(hadoopConf)
+    val srcFs = srcPath.getFileSystem(hadoopConf)
+    var destPath = srcPath
+    if (force || !compareFs(srcFs, destFs)) {
+      destPath = new Path(destDir, destName.getOrElse(srcPath.getName()))
+      logInfo(s"Uploading resource $srcPath -> $destPath")
+      FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf)
+      destFs.setReplication(destPath, replication)
+      destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION))
+    } else {
+      logInfo(s"Source and destination file systems are the same. Not copying $srcPath")
+    }
+    // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
+    // version shows the specific version in the distributed cache configuration
+    val qualifiedDestPath = destFs.makeQualified(destPath)
+    val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf)
+    fc.resolvePath(qualifiedDestPath)
+  }
+
+  /**
+   * Upload any resources to the distributed cache if needed. If a resource is intended to be
+   * consumed locally, set up the appropriate config for downstream code to handle it properly.
+   * This is used for setting up a container launch context for our ApplicationMaster.
+   * Exposed for testing.
+   */
+  def prepareLocalResources(
+      destDir: Path,
+      pySparkArchives: Seq[String]): HashMap[String, LocalResource] = {
+    logInfo("Preparing resources for our AM container")
+    // Upload Spark and the application JAR to the remote file system if necessary,
+    // and add them as local resources to the application master.
+    val fs = destDir.getFileSystem(hadoopConf)
+
+    // Merge credentials obtained from registered providers
+    val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials)
+
+    if (credentials != null) {
+      logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
+    }
+
+    // If we use principal and keytab to login, also credentials can be renewed some time
+    // after current time, we should pass the next renewal and updating time to credential
+    // renewer and updater.
+    if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() &&
+      nearestTimeOfNextRenewal != Long.MaxValue) {
+
+      // Valid renewal time is 75% of next renewal time, and the valid update time will be
+      // slightly later then renewal time (80% of next renewal time). This is to make sure
+      // credentials are renewed and updated before expired.
+      val currTime = System.currentTimeMillis()
+      val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime
+      val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime
+
+      sparkConf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong)
+      sparkConf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong)
+    }
+
+    // Used to keep track of URIs added to the distributed cache. If the same URI is added
+    // multiple times, YARN will fail to launch containers for the app with an internal
+    // error.
+    val distributedUris = new HashSet[String]
+    // Used to keep track of URIs(files) added to the distribute cache have the same name. If
+    // same name but different path files are added multiple time, YARN will fail to launch
+    // containers for the app with an internal error.
+    val distributedNames = new HashSet[String]
+
+    val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
+      .getOrElse(fs.getDefaultReplication(destDir))
+    val localResources = HashMap[String, LocalResource]()
+    FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
+
+    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+
+    def addDistributedUri(uri: URI): Boolean = {
+      val uriStr = uri.toString()
+      val fileName = new File(uri.getPath).getName
+      if (distributedUris.contains(uriStr)) {
+        logWarning(s"Same path resource $uri added multiple times to distributed cache.")
+        false
+      } else if (distributedNames.contains(fileName)) {
+        logWarning(s"Same name resource $uri added multiple times to distributed cache")
+        false
+      } else {
+        distributedUris += uriStr
+        distributedNames += fileName
+        true
+      }
+    }
+
+    /**
+     * Distribute a file to the cluster.
+     *
+     * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
+     * to HDFS (if not already there) and added to the application's distributed cache.
+     *
+     * @param path URI of the file to distribute.
+     * @param resType Type of resource being distributed.
+     * @param destName Name of the file in the distributed cache.
+     * @param targetDir Subdirectory where to place the file.
+     * @param appMasterOnly Whether to distribute only to the AM.
+     * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the
+     *         localized path for non-local paths, or the input `path` for local paths.
+     *         The localized path will be null if the URI has already been added to the cache.
+     */
+    def distribute(
+        path: String,
+        resType: LocalResourceType = LocalResourceType.FILE,
+        destName: Option[String] = None,
+        targetDir: Option[String] = None,
+        appMasterOnly: Boolean = false): (Boolean, String) = {
+      val trimmedPath = path.trim()
+      val localURI = Utils.resolveURI(trimmedPath)
+      if (localURI.getScheme != LOCAL_SCHEME) {
+        if (addDistributedUri(localURI)) {
+          val localPath = getQualifiedLocalPath(localURI, hadoopConf)
+          val linkname = targetDir.map(_ + "/").getOrElse("") +
+            destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
+          val destPath = copyFileToRemote(destDir, localPath, replication)
+          val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
+          distCacheMgr.addResource(
+            destFs, hadoopConf, destPath, localResources, resType, linkname, statCache,
+            appMasterOnly = appMasterOnly)
+          (false, linkname)
+        } else {
+          (false, null)
+        }
+      } else {
+        (true, trimmedPath)
+      }
+    }
+
+    // If we passed in a keytab, make sure we copy the keytab to the staging directory on
+    // HDFS, and setup the relevant environment vars, so the AM can login again.
+    if (loginFromKeytab) {
+      logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
+        " via the YARN Secure Distributed Cache.")
+      val (_, localizedPath) = distribute(keytab,
+        destName = sparkConf.get(KEYTAB),
+        appMasterOnly = true)
+      require(localizedPath != null, "Keytab file already distributed.")
+    }
+
+    /**
+     * Add Spark to the cache. There are two settings that control what files to add to the cache:
+     * - if a Spark archive is defined, use the archive. The archive is expected to contain
+     *   jar files at its root directory.
+     * - if a list of jars is provided, filter the non-local ones, resolve globs, and
+     *   add the found files to the cache.
+     *
+     * Note that the archive cannot be a "local" URI. If none of the above settings are found,
+     * then upload all files found in $SPARK_HOME/jars.
+     */
+    val sparkArchive = sparkConf.get(SPARK_ARCHIVE)
+    if (sparkArchive.isDefined) {
+      val archive = sparkArchive.get
+      require(!isLocalUri(archive), s"${SPARK_ARCHIVE.key} cannot be a local URI.")
+      distribute(Utils.resolveURI(archive).toString,
+        resType = LocalResourceType.ARCHIVE,
+        destName = Some(LOCALIZED_LIB_DIR))
+    } else {
+      sparkConf.get(SPARK_JARS) match {
+        case Some(jars) =>
+          // Break the list of jars to upload, and resolve globs.
+          val localJars = new ArrayBuffer[String]()
+          jars.foreach { jar =>
+            if (!isLocalUri(jar)) {
+              val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+              val pathFs = FileSystem.get(path.toUri(), hadoopConf)
+              pathFs.globStatus(path).filter(_.isFile()).foreach { entry =>
+                distribute(entry.getPath().toUri().toString(),
+                  targetDir = Some(LOCALIZED_LIB_DIR))
+              }
+            } else {
+              localJars += jar
+            }
+          }
+
+          // Propagate the local URIs to the containers using the configuration.
+          sparkConf.set(SPARK_JARS, localJars)
+
+        case None =>
+          // No configuration, so fall back to uploading local jar files.
+          logWarning(s"Neither ${SPARK_JARS.key} nor ${SPARK_ARCHIVE.key} is set, falling back " +
+            "to uploading libraries under SPARK_HOME.")
+          val jarsDir = new File(YarnCommandBuilderUtils.findJarsDir(
+            sparkConf.getenv("SPARK_HOME")))
+          val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
+            new File(Utils.getLocalDir(sparkConf)))
+          val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))
+
+          try {
+            jarsStream.setLevel(0)
+            jarsDir.listFiles().foreach { f =>
+              if (f.isFile && f.getName.toLowerCase().endsWith(".jar") && f.canRead) {
+                jarsStream.putNextEntry(new ZipEntry(f.getName))
+                Files.copy(f, jarsStream)
+                jarsStream.closeEntry()
+              }
+            }
+          } finally {
+            jarsStream.close()
+          }
+
+          distribute(jarsArchive.toURI.getPath,
+            resType = LocalResourceType.ARCHIVE,
+            destName = Some(LOCALIZED_LIB_DIR))
+      }
+    }
+
+    /**
+     * Copy user jar to the distributed cache if their scheme is not "local".
+     * Otherwise, set the corresponding key in our SparkConf to handle it downstream.
+     */
+    Option(args.userJar).filter(_.trim.nonEmpty).foreach { jar =>
+      val (isLocal, localizedPath) = distribute(jar, destName = Some(APP_JAR_NAME))
+      if (isLocal) {
+        require(localizedPath != null, s"Path $jar already distributed")
+        // If the resource is intended for local use only, handle this downstream
+        // by setting the appropriate property
+        sparkConf.set(APP_JAR, localizedPath)
+      }
+    }
+
+    /**
+     * Do the same for any additional resources passed in through ClientArguments.
+     * Each resource category is represented by a 3-tuple of:
+     *   (1) comma separated list of resources in this category,
+     *   (2) resource type, and
+     *   (3) whether to add these resources to the classpath
+     */
+    val cachedSecondaryJarLinks = ListBuffer.empty[String]
+    List(
+      (sparkConf.get(JARS_TO_DISTRIBUTE), LocalResourceType.FILE, true),
+      (sparkConf.get(FILES_TO_DISTRIBUTE), LocalResourceType.FILE, false),
+      (sparkConf.get(ARCHIVES_TO_DISTRIBUTE), LocalResourceType.ARCHIVE, false)
+    ).foreach { case (flist, resType, addToClasspath) =>
+      flist.foreach { file =>
+        val (_, localizedPath) = distribute(file, resType = resType)
+        // If addToClassPath, we ignore adding jar multiple times to distitrbuted cache.
+        if (addToClasspath) {
+          if (localizedPath != null) {
+            cachedSecondaryJarLinks += localizedPath
+          }
+        } else {
+          if (localizedPath == null) {
+            throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" +
+              " to the distributed cache.")
+          }
+        }
+      }
+    }
+    if (cachedSecondaryJarLinks.nonEmpty) {
+      sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks)
+    }
+
+    if (isClusterMode && args.primaryPyFile != null) {
+      distribute(args.primaryPyFile, appMasterOnly = true)
+    }
+
+    pySparkArchives.foreach { f => distribute(f) }
+
+    // The python files list needs to be treated especially. All files that are not an
+    // archive need to be placed in a subdirectory that will be added to PYTHONPATH.
+    sparkConf.get(PY_FILES).foreach { f =>
+      val targetDir = if (f.endsWith(".py")) Some(LOCALIZED_PYTHON_DIR) else None
+      distribute(f, targetDir = targetDir)
+    }
+
+    // Update the configuration with all the distributed files, minus the conf archive. The
+    // conf archive will be handled by the AM differently so that we avoid having to send
+    // this configuration by other means. See SPARK-14602 for one reason of why this is needed.
+    distCacheMgr.updateConfiguration(sparkConf)
+
+    // Upload the conf archive to HDFS manually, and record its location in the configuration.
+    // This will allow the AM to know where the conf archive is in HDFS, so that it can be
+    // distributed to the containers.
+    //
+    // This code forces the archive to be copied, so that unit tests pass (since in that case both
+    // file systems are the same and the archive wouldn't normally be copied). In most (all?)
+    // deployments, the archive would be copied anyway, since it's a temp file in the local file
+    // system.
+    val remoteConfArchivePath = new Path(destDir, LOCALIZED_CONF_ARCHIVE)
+    val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
+    sparkConf.set(CACHED_CONF_ARCHIVE, remoteConfArchivePath.toString())
+
+    val localConfArchive = new Path(createConfArchive().toURI())
+    copyFileToRemote(destDir, localConfArchive, replication, force = true,
+      destName = Some(LOCALIZED_CONF_ARCHIVE))
+
+    // Manually add the config archive to the cache manager so that the AM is launched with
+    // the proper files set up.
+    distCacheMgr.addResource(
+      remoteFs, hadoopConf, remoteConfArchivePath, localResources, LocalResourceType.ARCHIVE,
+      LOCALIZED_CONF_DIR, statCache, appMasterOnly = false)
+
+    // Clear the cache-related entries from the configuration to avoid them polluting the
+    // UI's environment page. This works for client mode; for cluster mode, this is handled
+    // by the AM.
+    CACHE_CONFIGS.foreach(sparkConf.remove)
+
+    localResources
+  }
+
+  /**
+   * Create an archive with the config files for distribution.
+   *
+   * These will be used by AM and executors. The files are zipped and added to the job as an
+   * archive, so that YARN will explode it when distributing to AM and executors. This directory
+   * is then added to the classpath of AM and executor process, just to make sure that everybody
+   * is using the same default config.
+   *
+   * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR
+   * shows up in the classpath before YARN_CONF_DIR.
+   *
+   * Currently this makes a shallow copy of the conf directory. If there are cases where a
+   * Hadoop config directory contains subdirectories, this code will have to be fixed.
+   *
+   * The archive also contains some Spark configuration. Namely, it saves the contents of
+   * SparkConf in a file to be loaded by the AM process.
+   */
+  private def createConfArchive(): File = {
+    val hadoopConfFiles = new HashMap[String, File]()
+
+    // Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that
+    // the executors will use the latest configurations instead of the default values. This is
+    // required when user changes log4j.properties directly to set the log configurations. If
+    // configuration file is provided through --files then executors will be taking configurations
+    // from --files instead of $SPARK_CONF_DIR/log4j.properties.
+
+    // Also uploading metrics.properties to distributed cache if exists in classpath.
+    // If user specify this file using --files then executors will use the one
+    // from --files instead.
+    for { prop <- Seq("log4j.properties", "metrics.properties")
+          url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
+          if url.getProtocol == "file" } {
+      hadoopConfFiles(prop) = new File(url.getPath)
+    }
+
+    Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
+      sys.env.get(envKey).foreach { path =>
+        val dir = new File(path)
+        if (dir.isDirectory()) {
+          val files = dir.listFiles()
+          if (files == null) {
+            logWarning("Failed to list files under directory " + dir)
+          } else {
+            files.foreach { file =>
+              if (file.isFile && !hadoopConfFiles.contains(file.getName())) {
+                hadoopConfFiles(file.getName()) = file
+              }
+            }
+          }
+        }
+      }
+    }
+
+    val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip",
+      new File(Utils.getLocalDir(sparkConf)))
+    val confStream = new ZipOutputStream(new FileOutputStream(confArchive))
+
+    try {
+      confStream.setLevel(0)
+      hadoopConfFiles.foreach { case (name, file) =>
+        if (file.canRead()) {
+          confStream.putNextEntry(new ZipEntry(name))
+          Files.copy(file, confStream)
+          confStream.closeEntry()
+        }
+      }
+
+      // Save Spark configuration to a file in the archive.
+      val props = new Properties()
+      sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
+      confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE))
+      val writer = new OutputStreamWriter(confStream, StandardCharsets.UTF_8)
+      props.store(writer, "Spark configuration.")
+      writer.flush()
+      confStream.closeEntry()
+    } finally {
+      confStream.close()
+    }
+    confArchive
+  }
+
+  /**
+   * Set up the environment for launching our ApplicationMaster container.
+   */
+  private def setupLaunchEnv(
+      stagingDirPath: Path,
+      pySparkArchives: Seq[String]): HashMap[String, String] = {
+    logInfo("Setting up the launch environment for our AM container")
+    val env = new HashMap[String, String]()
+    populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
+    env("SPARK_YARN_MODE") = "true"
+    env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
+    env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
+    if (loginFromKeytab) {
+      val credentialsFile = "credentials-" + UUID.randomUUID().toString
+      sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
+      logInfo(s"Credentials file set to: $credentialsFile")
+    }
+
+    // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
+    val amEnvPrefix = "spark.yarn.appMasterEnv."
+    sparkConf.getAll
+      .filter { case (k, v) => k.startsWith(amEnvPrefix) }
+      .map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
+      .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) }
+
+    // Keep this for backwards compatibility but users should move to the config
+    sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
+    // Allow users to specify some environment variables.
+      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
+      // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments.
+      env("SPARK_YARN_USER_ENV") = userEnvs
+    }
+
+    // If pyFiles contains any .py files, we need to add LOCALIZED_PYTHON_DIR to the PYTHONPATH
+    // of the container processes too. Add all non-.py files directly to PYTHONPATH.
+    //
+    // NOTE: the code currently does not handle .py files defined with a "local:" scheme.
+    val pythonPath = new ListBuffer[String]()
+    val (pyFiles, pyArchives) = sparkConf.get(PY_FILES).partition(_.endsWith(".py"))
+    if (pyFiles.nonEmpty) {
+      pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+        LOCALIZED_PYTHON_DIR)
+    }
+    (pySparkArchives ++ pyArchives).foreach { path =>
+      val uri = Utils.resolveURI(path)
+      if (uri.getScheme != LOCAL_SCHEME) {
+        pythonPath += buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+          new Path(uri).getName())
+      } else {
+        pythonPath += uri.getPath()
+      }
+    }
+
+    // Finally, update the Spark config to propagate PYTHONPATH to the AM and executors.
+    if (pythonPath.nonEmpty) {
+      val pythonPathStr = (sys.env.get("PYTHONPATH") ++ pythonPath)
+        .mkString(YarnSparkHadoopUtil.getClassPathSeparator)
+      env("PYTHONPATH") = pythonPathStr
+      sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr)
+    }
+
+    // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
+    // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
+    // SparkContext will not let that set spark* system properties, which is expected behavior for
+    // Yarn clients. So propagate it through the environment.
+    //
+    // Note that to warn the user about the deprecation in cluster mode, some code from
+    // SparkConf#validateSettings() is duplicated here (to avoid triggering the condition
+    // described above).
+    if (isClusterMode) {
+      sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
+        val warning =
+          s"""
+            |SPARK_JAVA_OPTS was detected (set to '$value').
+            |This is deprecated in Spark 1.0+.
+            |
+            |Please instead use:
+            | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
+            | - ./spark-submit with --driver-java-options to set -X options for a driver
+            | - spark.executor.extraJavaOptions to set -X options for executors
+          """.stripMargin
+        logWarning(warning)
+        for (proc <- Seq("driver", "executor")) {
+          val key = s"spark.$proc.extraJavaOptions"
+          if (sparkConf.contains(key)) {
+            throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")
+          }
+        }
+        env("SPARK_JAVA_OPTS") = value
+      }
+      // propagate PYSPARK_DRIVER_PYTHON and PYSPARK_PYTHON to driver in cluster mode
+      Seq("PYSPARK_DRIVER_PYTHON", "PYSPARK_PYTHON").foreach { envname =>
+        if (!env.contains(envname)) {
+          sys.env.get(envname).foreach(env(envname) = _)
+        }
+      }
+    }
+
+    sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp =>
+      env(ENV_DIST_CLASSPATH) = dcp
+    }
+
+    env
+  }
+
+  /**
+   * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
+   * This sets up the launch environment, java options, and the command for launching the AM.
+   */
+  private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
+    : ContainerLaunchContext = {
+    logInfo("Setting up container launch context for our AM")
+    val appId = newAppResponse.getApplicationId
+    val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
+    val pySparkArchives =
+      if (sparkConf.get(IS_PYTHON_APP)) {
+        findPySparkArchives()
+      } else {
+        Nil
+      }
+    val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
+    val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
+
+    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+    amContainer.setLocalResources(localResources.asJava)
+    amContainer.setEnvironment(launchEnv.asJava)
+
+    val javaOpts = ListBuffer[String]()
+
+    // Set the environment variable through a command prefix
+    // to append to the existing value of the variable
+    var prefixEnv: Option[String] = None
+
+    // Add Xmx for AM memory
+    javaOpts += "-Xmx" + amMemory + "m"
+
+    val tmpDir = new Path(
+      YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+      YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
+    )
+    javaOpts += "-Djava.io.tmpdir=" + tmpDir
+
+    // TODO: Remove once cpuset version is pushed out.
+    // The context is, default gc for server class machines ends up using all cores to do gc -
+    // hence if there are multiple containers in same node, Spark GC affects all other containers'
+    // performance (which can be that of other Spark containers)
+    // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in
+    // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset
+    // of cores on a node.
+    val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean)
+    if (useConcurrentAndIncrementalGC) {
+      // In our expts, using (default) throughput collector has severe perf ramifications in
+      // multi-tenant machines
+      javaOpts += "-XX:+UseConcMarkSweepGC"
+      javaOpts += "-XX:MaxTenuringThreshold=31"
+      javaOpts += "-XX:SurvivorRatio=8"
+      javaOpts += "-XX:+CMSIncrementalMode"
+      javaOpts += "-XX:+CMSIncrementalPacing"
+      javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
+      javaOpts += "-XX:CMSIncrementalDutyCycle=10"
+    }
+
+    // Include driver-specific java options if we are launching a driver
+    if (isClusterMode) {
+      val driverOpts = sparkConf.get(DRIVER_JAVA_OPTIONS).orElse(sys.env.get("SPARK_JAVA_OPTS"))
+      driverOpts.foreach { opts =>
+        javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
+      }
+      val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
+        sys.props.get("spark.driver.libraryPath")).flatten
+      if (libraryPaths.nonEmpty) {
+        prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(libraryPaths)))
+      }
+      if (sparkConf.get(AM_JAVA_OPTIONS).isDefined) {
+        logWarning(s"${AM_JAVA_OPTIONS.key} will not take effect in cluster mode")
+      }
+    } else {
+      // Validate and include yarn am specific java options in yarn-client mode.
+      sparkConf.get(AM_JAVA_OPTIONS).foreach { opts =>
+        if (opts.contains("-Dspark")) {
+          val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to set Spark options (was '$opts')."
+          throw new SparkException(msg)
+        }
+        if (opts.contains("-Xmx")) {
+          val msg = s"${AM_JAVA_OPTIONS.key} is not allowed to specify max heap memory settings " +
+            s"(was '$opts'). Use spark.yarn.am.memory instead."
+          throw new SparkException(msg)
+        }
+        javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
+      }
+      sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
+        prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(paths))))
+      }
+    }
+
+    // For log4j configuration to reference
+    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
+    YarnCommandBuilderUtils.addPermGenSizeOpt(javaOpts)
+
+    val userClass =
+      if (isClusterMode) {
+        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
+      } else {
+        Nil
+      }
+    val userJar =
+      if (args.userJar != null) {
+        Seq("--jar", args.userJar)
+      } else {
+        Nil
+      }
+    val primaryPyFile =
+      if (isClusterMode && args.primaryPyFile != null) {
+        Seq("--primary-py-file", new Path(args.primaryPyFile).getName())
+      } else {
+        Nil
+      }
+    val primaryRFile =
+      if (args.primaryRFile != null) {
+        Seq("--primary-r-file", args.primaryRFile)
+      } else {
+        Nil
+      }
+    val amClass =
+      if (isClusterMode) {
+        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
+      } else {
+        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
+      }
+    if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
+      args.userArgs = ArrayBuffer(args.primaryRFile) ++ args.userArgs
+    }
+    val userArgs = args.userArgs.flatMap { arg =>
+      Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
+    }
+    val amArgs =
+      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++
+        userArgs ++ Seq(
+          "--properties-file", buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+            LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
+
+    // Command for the ApplicationMaster
+    val commands = prefixEnv ++ Seq(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"
+      ) ++
+      javaOpts ++ amArgs ++
+      Seq(
+        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+    // TODO: it would be nicer to just make sure there are no null commands here
+    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
+    amContainer.setCommands(printableCommands.asJava)
+
+    logDebug("===============================================================================")
+    logDebug("YARN AM launch context:")
+    logDebug(s"    user class: ${Option(args.userClass).getOrElse("N/A")}")
+    logDebug("    env:")
+    launchEnv.foreach { case (k, v) => logDebug(s"        $k -> $v") }
+    logDebug("    resources:")
+    localResources.foreach { case (k, v) => logDebug(s"        $k -> $v")}
+    logDebug("    command:")
+    logDebug(s"        ${printableCommands.mkString(" ")}")
+    logDebug("===============================================================================")
+
+    // send the acl settings into YARN to control who has access via YARN interfaces
+    val securityManager = new SecurityManager(sparkConf)
+    amContainer.setApplicationACLs(
+      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
+    setupSecurityToken(amContainer)
+    amContainer
+  }
+
+  def setupCredentials(): Unit = {
+    loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
+    if (loginFromKeytab) {
+      principal = sparkConf.get(PRINCIPAL).get
+      keytab = sparkConf.get(KEYTAB).orNull
+
+      require(keytab != null, "Keytab must be specified when principal is specified.")
+      logInfo("Attempting to login to the Kerberos" +
+        s" using principal: $principal and keytab: $keytab")
+      val f = new File(keytab)
+      // Generate a file name that can be used for the keytab file, that does not conflict
+      // with any user file.
+      val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
+      sparkConf.set(KEYTAB.key, keytabFileName)
+      sparkConf.set(PRINCIPAL.key, principal)
+    }
+    // Defensive copy of the credentials
+    credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials)
+  }
+
+  /**
+   * Report the state of an application until it has exited, either successfully or
+   * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,
+   * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,
+   * or KILLED).
+   *
+   * @param appId ID of the application to monitor.
+   * @param returnOnRunning Whether to also return the application state when it is RUNNING.
+   * @param logApplicationReport Whether to log details of the application report every iteration.
+   * @return A pair of the yarn application state and the final application state.
+   */
+  def monitorApplication(
+      appId: ApplicationId,
+      returnOnRunning: Boolean = false,
+      logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
+    val interval = sparkConf.get(REPORT_INTERVAL)
+    var lastState: YarnApplicationState = null
+    while (true) {
+      Thread.sleep(interval)
+      val report: ApplicationReport =
+        try {
+          getApplicationReport(appId)
+        } catch {
+          case e: ApplicationNotFoundException =>
+            logError(s"Application $appId not found.")
+            cleanupStagingDir(appId)
+            return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
+          case NonFatal(e) =>
+            logError(s"Failed to contact YARN for application $appId.", e)
+            // Don't necessarily clean up staging dir because status is unknown
+            return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED)
+        }
+      val state = report.getYarnApplicationState
+
+      if (logApplicationReport) {
+        logInfo(s"Application report for $appId (state: $state)")
+
+        // If DEBUG is enabled, log report details every iteration
+        // Otherwise, log them every time the application changes state
+        if (log.isDebugEnabled) {
+          logDebug(formatReportDetails(report))
+        } else if (lastState != state) {
+          logInfo(formatReportDetails(report))
+        }
+      }
+
+      if (lastState != state) {
+        state match {
+          case YarnApplicationState.RUNNING =>
+            reportLauncherState(SparkAppHandle.State.RUNNING)
+          case YarnApplicationState.FINISHED =>
+            report.getFinalApplicationStatus match {
+              case FinalApplicationStatus.FAILED =>
+                reportLauncherState(SparkAppHandle.State.FAILED)
+              case FinalApplicationStatus.KILLED =>
+                reportLauncherState(SparkAppHandle.State.KILLED)
+              case _ =>
+                reportLauncherState(SparkAppHandle.State.FINISHED)
+            }
+          case YarnApplicationState.FAILED =>
+            reportLauncherState(SparkAppHandle.State.FAILED)
+          case YarnApplicationState.KILLED =>
+            reportLauncherState(SparkAppHandle.State.KILLED)
+          case _ =>
+        }
+      }
+
+      if (state == YarnApplicationState.FINISHED ||
+        state == YarnApplicationState.FAILED ||
+        state == YarnApplicationState.KILLED) {
+        cleanupStagingDir(appId)
+        return (state, report.getFinalApplicationStatus)
+      }
+
+      if (returnOnRunning && state == YarnApplicationState.RUNNING) {
+        return (state, report.getFinalApplicationStatus)
+      }
+
+      lastState = state
+    }
+
+    // Never reached, but keeps compiler happy
+    throw new SparkException("While loop is depleted! This should never happen...")
+  }
+
+  private def formatReportDetails(report: ApplicationReport): String = {
+    val details = Seq[(String, String)](
+      ("client token", getClientToken(report)),
+      ("diagnostics", report.getDiagnostics),
+      ("ApplicationMaster host", report.getHost),
+      ("ApplicationMaster RPC port", report.getRpcPort.toString),
+      ("queue", report.getQueue),
+      ("start time", report.getStartTime.toString),
+      ("final status", report.getFinalApplicationStatus.toString),
+      ("tracking URL", report.getTrackingUrl),
+      ("user", report.getUser)
+    )
+
+    // Use more loggable format if value is null or empty
+    details.map { case (k, v) =>
+      val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
+      s"\n\t $k: $newValue"
+    }.mkString("")
+  }
+
+  /**
+   * Submit an application to the ResourceManager.
+   * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
+   * reporting the application's status until the application has exited for any reason.
+   * Otherwise, the client process will exit after submission.
+   * If the application finishes with a failed, killed, or undefined status,
+   * throw an appropriate SparkException.
+   */
+  def run(): Unit = {
+    this.appId = submitApplication()
+    if (!launcherBackend.isConnected() && fireAndForget) {
+      val report = getApplicationReport(appId)
+      val state = report.getYarnApplicationState
+      logInfo(s"Application report for $appId (state: $state)")
+      logInfo(formatReportDetails(report))
+      if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
+        throw new SparkException(s"Application $appId finished with status: $state")
+      }
+    } else {
+      val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
+      if (yarnApplicationState == YarnApplicationState.FAILED ||
+        finalApplicationStatus == FinalApplicationStatus.FAILED) {
+        throw new SparkException(s"Application $appId finished with failed status")
+      }
+      if (yarnApplicationState == YarnApplicationState.KILLED ||
+        finalApplicationStatus == FinalApplicationStatus.KILLED) {
+        throw new SparkException(s"Application $appId is killed")
+      }
+      if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
+        throw new SparkException(s"The final status of application $appId is undefined")
+      }
+    }
+  }
+
+  private def findPySparkArchives(): Seq[String] = {
+    sys.env.get("PYSPARK_ARCHIVES_PATH")
+      .map(_.split(",").toSeq)
+      .getOrElse {
+        val pyLibPath = Seq(sys.env("SPARK_HOME"), "python", "lib").mkString(File.separator)
+        val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
+        require(pyArchivesFile.exists(),
+          s"$pyArchivesFile not found; cannot run pyspark application in YARN mode.")
+        val py4jFile = new File(pyLibPath, "py4j-0.10.4-src.zip")
+        require(py4jFile.exists(),
+          s"$py4jFile not found; cannot run pyspark application in YARN mode.")
+        Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
+      }
+  }
+
+}
+
+private object Client extends Logging {
+
+  def main(argStrings: Array[String]) {
+    if (!sys.props.contains("SPARK_SUBMIT")) {
+      logWarning("WARNING: This client is deprecated and will be removed in a " +
+        "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
+    }
+
+    // Set an env variable indicating we are running in YARN mode.
+    // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
+    System.setProperty("SPARK_YARN_MODE", "true")
+    val sparkConf = new SparkConf
+    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
+    // so remove them from sparkConf here for yarn mode.
+    sparkConf.remove("spark.jars")
+    sparkConf.remove("spark.files")
+    val args = new ClientArguments(argStrings)
+    new Client(args, sparkConf).run()
+  }
+
+  // Alias for the user jar
+  val APP_JAR_NAME: String = "__app__.jar"
+
+  // URI scheme that identifies local resources
+  val LOCAL_SCHEME = "local"
+
+  // Staging directory for any temporary jars or files
+  val SPARK_STAGING: String = ".sparkStaging"
+
+
+  // Staging directory is private! -> rwx--------
+  val STAGING_DIR_PERMISSION: FsPermission =
+    FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
+
+  // App files are world-wide readable and owner writable -> rw-r--r--
+  val APP_FILE_PERMISSION: FsPermission =
+    FsPermission.createImmutable(Integer.parseInt("644", 8).toShort)
+
+  // Distribution-defined classpath to add to processes
+  val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH"
+
+  // Subdirectory where the user's Spark and Hadoop config files will be placed.
+  val LOCALIZED_CONF_DIR = "__spark_conf__"
+
+  // File containing the conf archive in the AM. See prepareLocalResources().
+  val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip"
+
+  // Name of the file in the conf archive containing Spark configuration.
+  val SPARK_CONF_FILE = "__spark_conf__.properties"
+
+  // Subdirectory where the user's python files (not archives) will be placed.
+  val LOCALIZED_PYTHON_DIR = "__pyfiles__"
+
+  // Subdirectory where Spark libraries will be placed.
+  val LOCALIZED_LIB_DIR = "__spark_libs__"
+
+  /**
+   * Return the path to the given application's staging directory.
+   */
+  private def getAppStagingDir(appId: ApplicationId): String = {
+    buildPath(SPARK_STAGING, appId.toString())
+  }
+
+  /**
+   * Populate the classpath entry in the given environment map with any application
+   * classpath specified through the Hadoop and Yarn configurations.
+   */
+  private[yarn] def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String])
+    : Unit = {
+    val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf)
+    for (c <- classPathElementsToAdd.flatten) {
+      YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim)
+    }
+  }
+
+  private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] =
+    Option(conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) match {
+      case Some(s) => Some(s.toSeq)
+      case None => getDefaultYarnApplicationClasspath
+    }
+
+  private def getMRAppClasspath(conf: Configuration): Option[Seq[String]] =
+    Option(conf.getStrings("mapreduce.application.classpath")) match {
+      case Some(s) => Some(s.toSeq)
+      case None => getDefaultMRApplicationClasspath
+    }
+
+  private[yarn] def getDefaultYarnApplicationClasspath: Option[Seq[String]] = {
+    val triedDefault = Try[Seq[String]] {
+      val field = classOf[YarnConfiguration].getField("DEFAULT_YARN_APPLICATION_CLASSPATH")
+      val value = field.get(null).asInstanceOf[Array[String]]
+      value.toSeq
+    } recoverWith {
+      case e: NoSuchFieldException => Success(Seq.empty[String])
+    }
+
+    triedDefault match {
+      case f: Failure[_] =>
+        logError("Unable to obtain the default YARN Application classpath.", f.exception)
+      case s: Success[Seq[String]] =>
+        logDebug(s"Using the default YARN application classpath: ${s.get.mkString(",")}")
+    }
+
+    triedDefault.toOption
+  }
+
+  private[yarn] def getDefaultMRApplicationClasspath: Option[Seq[String]] = {
+    val triedDefault = Try[Seq[String]] {
+      val field = classOf[MRJobConfig].getField("DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH")
+      StringUtils.getStrings(field.get(null).asInstanceOf[String]).toSeq
+    } recoverWith {
+      case e: NoSuchFieldException => Success(Seq.empty[String])
+    }
+
+    triedDefault match {
+      case f: Failure[_] =>
+        logError("Unable to obtain the default MR Application classpath.", f.exception)
+      case s: Success[Seq[String]] =>
+        logDebug(s"Using the default MR application classpath: ${s.get.mkString(",")}")
+    }
+
+    triedDefault.toOption
+  }
+
+  /**
+   * Populate the classpath entry in the given environment map.
+   *
+   * User jars are generally not added to the JVM's system classpath; those are handled by the AM
+   * and executor backend. When the deprecated `spark.yarn.user.classpath.first` is used, user jars
+   * are included in the system classpath, though. The extra class path and other uploaded files are
+   * always made available through the system class path.
+   *
+   * @param args Client arguments (when starting the AM) or null (when starting executors).
+   */
+  private[yarn] def populateClasspath(
+      args: ClientArguments,
+      conf: Configuration,
+      sparkConf: SparkConf,
+      env: HashMap[String, String],
+      extraClassPath: Option[String] = None): Unit = {
+    extraClassPath.foreach { cp =>
+      addClasspathEntry(getClusterPath(sparkConf, cp), env)
+    }
+
+    addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env)
+
+    addClasspathEntry(
+      YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR +
+        LOCALIZED_CONF_DIR, env)
+
+    if (sparkConf.get(USER_CLASS_PATH_FIRST)) {
+      // in order to properly add the app jar when user classpath is first
+      // we have to do the mainJar separate in order to send the right thing
+      // into addFileToClasspath
+      val mainJar =
+        if (args != null) {
+          getMainJarUri(Option(args.userJar))
+        } else {
+          getMainJarUri(sparkConf.get(APP_JAR))
+        }
+      mainJar.foreach(addFileToClasspath(sparkConf, conf, _, APP_JAR_NAME, env))
+
+      val secondaryJars =
+        if (args != null) {
+          getSecondaryJarUris(Option(sparkConf.get(JARS_TO_DISTRIBUTE)))
+        } else {
+          getSecondaryJarUris(sparkConf.get(SECONDARY_JARS))
+        }
+      secondaryJars.foreach { x =>
+        addFileToClasspath(sparkConf, conf, x, null, env)
+      }
+    }
+
+    // Add the Spark jars to the classpath, depending on how they were distributed.
+    addClasspathEntry(buildPath(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+      LOCALIZED_LIB_DIR, "*"), env)
+    if (!sparkConf.get(SPARK_ARCHIVE).isDefined) {
+      sparkConf.get(SPARK_JARS).foreach { jars =>
+        jars.filter(isLocalUri).foreach { jar =>
+          addClasspathEntry(getClusterPath(sparkConf, jar), env)
+        }
+      }
+    }
+
+    populateHadoopClasspath(conf, env)
+    sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
+      addClasspathEntry(getClusterPath(sparkConf, cp), env)
+    }
+  }
+
+  /**
+   * Returns a list of URIs representing the user classpath.
+   *
+   * @param conf Spark configuration.
+   */
+  def getUserClasspath(conf: SparkConf): Array[URI] = {
+    val mainUri = getMainJarUri(conf.get(APP_JAR))
+    val secondaryUris = getSecondaryJarUris(conf.get(SECONDARY_JARS))
+    (mainUri ++ secondaryUris).toArray
+  }
+
+  private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
+    mainJar.flatMap { path =>
+      val uri = Utils.resolveURI(path)
+      if (uri.getScheme == LOCAL_SCHEME) Some(uri) else None
+    }.orElse(Some(new URI(APP_JAR_NAME)))
+  }
+
+  private def getSecondaryJarUris(secondaryJars: Option[Seq[String]]): Seq[URI] = {
+    secondaryJars.getOrElse(Nil).map(new URI(_))
+  }
+
+  /**
+   * Adds the given path to the classpath, handling "local:" URIs correctly.
+   *
+   * If an alternate name for the file is given, and it's not a "local:" file, the alternate
+   * name will be added to the classpath (relative to the job's work directory).
+   *
+   * If not a "local:" file and no alternate name, the linkName will be added to the classpath.
+   *
+   * @param conf        Spark configuration.
+   * @param hadoopConf  Hadoop configuration.
+   * @param uri         URI to add to classpath (optional).
+   * @param fileName    Alternate name for the file (optional).
+   * @param env         Map holding the environment variables.
+   */
+  private def addFileToClasspath(
+      conf: SparkConf,
+      hadoopConf: Configuration,
+      uri: URI,
+      fileName: String,
+      env: HashMap[String, String]): Unit = {
+    if (uri != null && uri.getScheme == LOCAL_SCHEME) {
+      addClasspathEntry(getClusterPath(conf, uri.getPath), env)
+    } else if (fileName != null) {
+      addClasspathEntry(buildPath(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
+    } else if (uri != null) {
+      val localPath = getQualifiedLocalPath(uri, hadoopConf)
+      val linkName = Option(uri.getFragment()).getOrElse(localPath.getName())
+      addClasspathEntry(buildPath(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), linkName), env)
+    }
+  }
+
+  /**
+   * Add the given path to the classpath entry of the given environment map.
+   * If the classpath is already set, this appends the new path to the existing classpath.
+   */
+  private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit =
+    YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
+
+  /**
+   * Returns the path to be sent to the NM for a path that is valid on the gateway.
+   *
+   * This method uses two configuration values:
+   *
+   *  - spark.yarn.config.gatewayPath: a string that identifies a portion of the input path that may
+   *    only be valid in the gateway node.
+   *  - spark.yarn.config.replacementPath: a string with which to replace the gateway path. This may
+   *    contain, for example, env variable references, which will be expanded by the NMs when
+   *    starting containers.
+   *
+   * If either config is not available, the input path is returned.
+   */
+  def getClusterPath(conf: SparkConf, path: String): String = {
+    val localPath = conf.get(GATEWAY_ROOT_PATH)
+    val clusterPath = conf.get(REPLACEMENT_ROOT_PATH)
+    if (localPath != null && clusterPath != null) {
+      path.replace(localPath, clusterPath)
+    } else {
+      path
+    }
+  }
+
+  /**
+   * Return whether the two file systems are the same.
+   */
+  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+    val srcUri = srcFs.getUri()
+    val dstUri = destFs.getUri()
+    if (srcUri.getScheme() == null || srcUri.getScheme() != dstUri.getScheme()) {
+      return false
+    }
+
+    var srcHost = srcUri.getHost()
+    var dstHost = dstUri.getHost()
+
+    // In HA or when using viewfs, the host part of the URI may not actually be a host, but the
+    // name of the HDFS namespace. Those names won't resolve, so avoid even trying if they
+    // match.
+    if (srcHost != null && dstHost != null && srcHost != dstHost) {
+      try {
+        srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+        dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
+      } catch {
+        case e: UnknownHostException =>
+          return false
+      }
+    }
+
+    Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort()
+  }
+
+  /**
+   * Given a local URI, resolve it and return a qualified local path that corresponds to the URI.
+   * This is used for preparing local resources to be included in the container launch context.
+   */
+  private def getQualifiedLocalPath(localURI: URI, hadoopConf: Configuration): Path = {
+    val qualifiedURI =
+      if (localURI.getScheme == null) {
+        // If not specified, assume this is in the local filesystem to keep the behavior
+        // consistent with that of Hadoop
+        new URI(FileSystem.getLocal(hadoopConf).makeQualified(new Path(localURI)).toString)
+      } else {
+        localURI
+      }
+    new Path(qualifiedURI)
+  }
+
+  /**
+   * Whether to consider jars provided by the user to have precedence over the Spark jars when
+   * loading user classes.
+   */
+  def isUserClassPathFirst(conf: SparkConf, isDriver: Boolean): Boolean = {
+    if (isDriver) {
+      conf.get(DRIVER_USER_CLASS_PATH_FIRST)
+    } else {
+      conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)
+    }
+  }
+
+  /**
+   * Joins all the path components using Path.SEPARATOR.
+   */
+  def buildPath(components: String*): String = {
+    components.mkString(Path.SEPARATOR)
+  }
+
+  /** Returns whether the URI is a "local:" URI. */
+  def isLocalUri(uri: String): Boolean = {
+    uri.startsWith(s"$LOCAL_SCHEME:")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
new file mode 100644
index 0000000..61c027e
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+// TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware !
+private[spark] class ClientArguments(args: Array[String]) {
+
+  var userJar: String = null
+  var userClass: String = null
+  var primaryPyFile: String = null
+  var primaryRFile: String = null
+  var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
+
+  parseArgs(args.toList)
+
+  private def parseArgs(inputArgs: List[String]): Unit = {
+    var args = inputArgs
+
+    while (!args.isEmpty) {
+      args match {
+        case ("--jar") :: value :: tail =>
+          userJar = value
+          args = tail
+
+        case ("--class") :: value :: tail =>
+          userClass = value
+          args = tail
+
+        case ("--primary-py-file") :: value :: tail =>
+          primaryPyFile = value
+          args = tail
+
+        case ("--primary-r-file") :: value :: tail =>
+          primaryRFile = value
+          args = tail
+
+        case ("--arg") :: value :: tail =>
+          userArgs += value
+          args = tail
+
+        case Nil =>
+
+        case _ =>
+          throw new IllegalArgumentException(getUsageMessage(args))
+      }
+    }
+
+    if (primaryPyFile != null && primaryRFile != null) {
+      throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" +
+        " at the same time")
+    }
+  }
+
+  private def getUsageMessage(unknownParam: List[String] = null): String = {
+    val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
+    message +
+      s"""
+      |Usage: org.apache.spark.deploy.yarn.Client [options]
+      |Options:
+      |  --jar JAR_PATH           Path to your application's JAR file (required in yarn-cluster
+      |                           mode)
+      |  --class CLASS_NAME       Name of your application's main class (required)
+      |  --primary-py-file        A main Python file
+      |  --primary-r-file         A main R file
+      |  --arg ARG                Argument to be passed to your application's main class.
+      |                           Multiple invocations are possible, each will be passed in order.
+      """.stripMargin
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
new file mode 100644
index 0000000..dcc2288
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.net.URI
+
+import scala.collection.mutable.{HashMap, ListBuffer, Map}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsAction
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.internal.Logging
+
+private case class CacheEntry(
+  uri: URI,
+  size: Long,
+  modTime: Long,
+  visibility: LocalResourceVisibility,
+  resType: LocalResourceType)
+
+/** Client side methods to setup the Hadoop distributed cache */
+private[spark] class ClientDistributedCacheManager() extends Logging {
+
+  private val distCacheEntries = new ListBuffer[CacheEntry]()
+
+  /**
+   * Add a resource to the list of distributed cache resources. This list can
+   * be sent to the ApplicationMaster and possibly the executors so that it can
+   * be downloaded into the Hadoop distributed cache for use by this application.
+   * Adds the LocalResource to the localResources HashMap passed in and saves
+   * the stats of the resources to they can be sent to the executors and verified.
+   *
+   * @param fs FileSystem
+   * @param conf Configuration
+   * @param destPath path to the resource
+   * @param localResources localResource hashMap to insert the resource into
+   * @param resourceType LocalResourceType
+   * @param link link presented in the distributed cache to the destination
+   * @param statCache cache to store the file/directory stats
+   * @param appMasterOnly Whether to only add the resource to the app master
+   */
+  def addResource(
+      fs: FileSystem,
+      conf: Configuration,
+      destPath: Path,
+      localResources: HashMap[String, LocalResource],
+      resourceType: LocalResourceType,
+      link: String,
+      statCache: Map[URI, FileStatus],
+      appMasterOnly: Boolean = false): Unit = {
+    val destStatus = fs.getFileStatus(destPath)
+    val amJarRsrc = Records.newRecord(classOf[LocalResource])
+    amJarRsrc.setType(resourceType)
+    val visibility = getVisibility(conf, destPath.toUri(), statCache)
+    amJarRsrc.setVisibility(visibility)
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath))
+    amJarRsrc.setTimestamp(destStatus.getModificationTime())
+    amJarRsrc.setSize(destStatus.getLen())
+    require(link != null && link.nonEmpty, "You must specify a valid link name.")
+    localResources(link) = amJarRsrc
+
+    if (!appMasterOnly) {
+      val uri = destPath.toUri()
+      val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link)
+      distCacheEntries += CacheEntry(pathURI, destStatus.getLen(), destStatus.getModificationTime(),
+        visibility, resourceType)
+    }
+  }
+
+  /**
+   * Writes down information about cached files needed in executors to the given configuration.
+   */
+  def updateConfiguration(conf: SparkConf): Unit = {
+    conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString))
+    conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size))
+    conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime))
+    conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name()))
+    conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name()))
+  }
+
+  /**
+   * Returns the local resource visibility depending on the cache file permissions
+   * @return LocalResourceVisibility
+   */
+  private[yarn] def getVisibility(
+      conf: Configuration,
+      uri: URI,
+      statCache: Map[URI, FileStatus]): LocalResourceVisibility = {
+    if (isPublic(conf, uri, statCache)) {
+      LocalResourceVisibility.PUBLIC
+    } else {
+      LocalResourceVisibility.PRIVATE
+    }
+  }
+
+  /**
+   * Returns a boolean to denote whether a cache file is visible to all (public)
+   * @return true if the path in the uri is visible to all, false otherwise
+   */
+  private def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = {
+    val fs = FileSystem.get(uri, conf)
+    val current = new Path(uri.getPath())
+    // the leaf level file should be readable by others
+    if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) {
+      return false
+    }
+    ancestorsHaveExecutePermissions(fs, current.getParent(), statCache)
+  }
+
+  /**
+   * Returns true if all ancestors of the specified path have the 'execute'
+   * permission set for all users (i.e. that other users can traverse
+   * the directory hierarchy to the given path)
+   * @return true if all ancestors have the 'execute' permission set for all users
+   */
+  private def ancestorsHaveExecutePermissions(
+      fs: FileSystem,
+      path: Path,
+      statCache: Map[URI, FileStatus]): Boolean = {
+    var current = path
+    while (current != null) {
+      // the subdirs in the path should have execute permissions for others
+      if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
+        return false
+      }
+      current = current.getParent()
+    }
+    true
+  }
+
+  /**
+   * Checks for a given path whether the Other permissions on it
+   * imply the permission in the passed FsAction
+   * @return true if the path in the uri is visible to all, false otherwise
+   */
+  private def checkPermissionOfOther(
+      fs: FileSystem,
+      path: Path,
+      action: FsAction,
+      statCache: Map[URI, FileStatus]): Boolean = {
+    val status = getFileStatus(fs, path.toUri(), statCache)
+    val perms = status.getPermission()
+    val otherAction = perms.getOtherAction()
+    otherAction.implies(action)
+  }
+
+  /**
+   * Checks to see if the given uri exists in the cache, if it does it
+   * returns the existing FileStatus, otherwise it stats the uri, stores
+   * it in the cache, and returns the FileStatus.
+   * @return FileStatus
+   */
+  private[yarn] def getFileStatus(
+      fs: FileSystem,
+      uri: URI,
+      statCache: Map[URI, FileStatus]): FileStatus = {
+    val stat = statCache.get(uri) match {
+      case Some(existstat) => existstat
+      case None =>
+        val newStat = fs.getFileStatus(new Path(uri))
+        statCache.put(uri, newStat)
+        newStat
+    }
+    stat
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/81e5619c/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
new file mode 100644
index 0000000..868c2ed
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.File
+import java.nio.ByteBuffer
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, ListBuffer}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api._
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.NMClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.ipc.YarnRPC
+import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.YarnCommandBuilderUtils
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.util.Utils
+
+private[yarn] class ExecutorRunnable(
+    container: Option[Container],
+    conf: YarnConfiguration,
+    sparkConf: SparkConf,
+    masterAddress: String,
+    executorId: String,
+    hostname: String,
+    executorMemory: Int,
+    executorCores: Int,
+    appId: String,
+    securityMgr: SecurityManager,
+    localResources: Map[String, LocalResource]) extends Logging {
+
+  var rpc: YarnRPC = YarnRPC.create(conf)
+  var nmClient: NMClient = _
+
+  def run(): Unit = {
+    logDebug("Starting Executor Container")
+    nmClient = NMClient.createNMClient()
+    nmClient.init(conf)
+    nmClient.start()
+    startContainer()
+  }
+
+  def launchContextDebugInfo(): String = {
+    val commands = prepareCommand()
+    val env = prepareEnvironment()
+
+    s"""
+    |===============================================================================
+    |YARN executor launch context:
+    |  env:
+    |${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s"    $k -> $v\n" }.mkString}
+    |  command:
+    |    ${commands.mkString(" \\ \n      ")}
+    |
+    |  resources:
+    |${localResources.map { case (k, v) => s"    $k -> $v\n" }.mkString}
+    |===============================================================================""".stripMargin
+  }
+
+  def startContainer(): java.util.Map[String, ByteBuffer] = {
+    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+      .asInstanceOf[ContainerLaunchContext]
+    val env = prepareEnvironment().asJava
+
+    ctx.setLocalResources(localResources.asJava)
+    ctx.setEnvironment(env)
+
+    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
+    val dob = new DataOutputBuffer()
+    credentials.writeTokenStorageToStream(dob)
+    ctx.setTokens(ByteBuffer.wrap(dob.getData()))
+
+    val commands = prepareCommand()
+
+    ctx.setCommands(commands.asJava)
+    ctx.setApplicationACLs(
+      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)
+
+    // If external shuffle service is enabled, register with the Yarn shuffle service already
+    // started on the NodeManager and, if authentication is enabled, provide it with our secret
+    // key for fetching shuffle files later
+    if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
+      val secretString = securityMgr.getSecretKey()
+      val secretBytes =
+        if (secretString != null) {
+          // This conversion must match how the YarnShuffleService decodes our secret
+          JavaUtils.stringToBytes(secretString)
+        } else {
+          // Authentication is not enabled, so just provide dummy metadata
+          ByteBuffer.allocate(0)
+        }
+      ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
+    }
+
+    // Send the start request to the ContainerManager
+    try {
+      nmClient.startContainer(container.get, ctx)
+    } catch {
+      case ex: Exception =>
+        throw new SparkException(s"Exception while starting container ${container.get.getId}" +
+          s" on host $hostname", ex)
+    }
+  }
+
+  private def prepareCommand(): List[String] = {
+    // Extra options for the JVM
+    val javaOpts = ListBuffer[String]()
+
+    // Set the environment variable through a command prefix
+    // to append to the existing value of the variable
+    var prefixEnv: Option[String] = None
+
+    // Set the JVM memory
+    val executorMemoryString = executorMemory + "m"
+    javaOpts += "-Xmx" + executorMemoryString
+
+    // Set extra Java options for the executor, if defined
+    sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts =>
+      javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
+    }
+    sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
+      javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
+    }
+    sparkConf.get(EXECUTOR_LIBRARY_PATH).foreach { p =>
+      prefixEnv = Some(Client.getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(p))))
+    }
+
+    javaOpts += "-Djava.io.tmpdir=" +
+      new Path(
+        YarnSparkHadoopUtil.expandEnvironment(Environment.PWD),
+        YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR
+      )
+
+    // Certain configs need to be passed here because they are needed before the Executor
+    // registers with the Scheduler and transfers the spark configs. Since the Executor backend
+    // uses RPC to connect to the scheduler, the RPC settings are needed as well as the
+    // authentication settings.
+    sparkConf.getAll
+      .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) }
+      .foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") }
+
+    // Commenting it out for now - so that people can refer to the properties if required. Remove
+    // it once cpuset version is pushed out.
+    // The context is, default gc for server class machines end up using all cores to do gc - hence
+    // if there are multiple containers in same node, spark gc effects all other containers
+    // performance (which can also be other spark containers)
+    // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in
+    // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset
+    // of cores on a node.
+    /*
+        else {
+          // If no java_opts specified, default to using -XX:+CMSIncrementalMode
+          // It might be possible that other modes/config is being done in
+          // spark.executor.extraJavaOptions, so we don't want to mess with it.
+          // In our expts, using (default) throughput collector has severe perf ramifications in
+          // multi-tenant machines
+          // The options are based on
+          // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use
+          // %20the%20Concurrent%20Low%20Pause%20Collector|outline
+          javaOpts += "-XX:+UseConcMarkSweepGC"
+          javaOpts += "-XX:+CMSIncrementalMode"
+          javaOpts += "-XX:+CMSIncrementalPacing"
+          javaOpts += "-XX:CMSIncrementalDutyCycleMin=0"
+          javaOpts += "-XX:CMSIncrementalDutyCycle=10"
+        }
+    */
+
+    // For log4j configuration to reference
+    javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
+    YarnCommandBuilderUtils.addPermGenSizeOpt(javaOpts)
+
+    val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
+      val absPath =
+        if (new File(uri.getPath()).isAbsolute()) {
+          Client.getClusterPath(sparkConf, uri.getPath())
+        } else {
+          Client.buildPath(Environment.PWD.$(), uri.getPath())
+        }
+      Seq("--user-class-path", "file:" + absPath)
+    }.toSeq
+
+    YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
+    val commands = prefixEnv ++ Seq(
+      YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
+      "-server") ++
+      javaOpts ++
+      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
+        "--driver-url", masterAddress,
+        "--executor-id", executorId,
+        "--hostname", hostname,
+        "--cores", executorCores.toString,
+        "--app-id", appId) ++
+      userClassPath ++
+      Seq(
+        s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
+        s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
+
+    // TODO: it would be nicer to just make sure there are no null commands here
+    commands.map(s => if (s == null) "null" else s).toList
+  }
+
+  private def prepareEnvironment(): HashMap[String, String] = {
+    val env = new HashMap[String, String]()
+    Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))
+
+    sparkConf.getExecutorEnv.foreach { case (key, value) =>
+      // This assumes each executor environment variable set here is a path
+      // This is kept for backward compatibility and consistency with hadoop
+      YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
+    }
+
+    // Keep this for backwards compatibility but users should move to the config
+    sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
+      YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
+    }
+
+    // lookup appropriate http scheme for container log urls
+    val yarnHttpPolicy = conf.get(
+      YarnConfiguration.YARN_HTTP_POLICY_KEY,
+      YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
+    )
+    val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
+
+    // Add log urls
+    container.foreach { c =>
+      sys.env.get("SPARK_USER").foreach { user =>
+        val containerId = ConverterUtils.toString(c.getId)
+        val address = c.getNodeHttpAddress
+        val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"
+
+        env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096"
+        env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096"
+      }
+    }
+
+    System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
+      .foreach { case (k, v) => env(k) = v }
+    env
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org