You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/01/25 19:52:56 UTC

[spark] branch master updated: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f06bc0c  [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
f06bc0c is described below

commit f06bc0cd1dee2a58e04ebf24bf719a2f7ef2dc4e
Author: Devaraj K <de...@apache.org>
AuthorDate: Fri Jan 25 11:52:45 2019 -0800

    [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
    
    ## What changes were proposed in this pull request?
    
    Providing a new configuration "spark.yarn.un-managed-am" (defaults to false) to enable the Unmanaged AM Application in Yarn Client mode which launches the Application Master service as part of the Client. It utilizes the existing code for communicating between the Application Master <-> Task Scheduler for the container requests/allocations/launch, and eliminates these,
    1. Allocating and launching the Application Master container
    2. Remote Node/Process communication between Application Master <-> Task Scheduler
    
    ## How was this patch tested?
    
    I verified manually running the applications in yarn-client mode with "spark.yarn.un-managed-am" enabled, and also ensured that there is no impact to the existing execution flows.
    
    I would like to hear others feedback/thoughts on this.
    
    Closes #19616 from devaraj-kavali/SPARK-22404.
    
    Authored-by: Devaraj K <de...@apache.org>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../spark/deploy/yarn/ApplicationMaster.scala      | 175 ++++++++++++++-------
 .../org/apache/spark/deploy/yarn/Client.scala      |  74 ++++++---
 .../spark/deploy/yarn/ExecutorRunnable.scala       |   2 +-
 .../org/apache/spark/deploy/yarn/config.scala      |   8 +
 .../cluster/YarnClientSchedulerBackend.scala       |   2 +-
 .../org/apache/spark/deploy/yarn/ClientSuite.scala |   6 +-
 .../spark/deploy/yarn/YarnClusterSuite.scala       |   4 +
 7 files changed, 193 insertions(+), 78 deletions(-)

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 01b9188..7458e13 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException
+import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils
 import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
 
 import org.apache.spark._
@@ -54,36 +55,27 @@ import org.apache.spark.util._
 /**
  * Common application master functionality for Spark on Yarn.
  */
-private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
+private[spark] class ApplicationMaster(
+    args: ApplicationMasterArguments,
+    sparkConf: SparkConf,
+    yarnConf: YarnConfiguration) extends Logging {
 
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
 
-  private val appAttemptId = YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
-  private val isClusterMode = args.userClass != null
-
-  private val sparkConf = new SparkConf()
-  if (args.propertiesFile != null) {
-    Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
-      sparkConf.set(k, v)
+  private val appAttemptId =
+    if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) != null) {
+      YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
+    } else {
+      null
     }
-  }
+
+  private val isClusterMode = args.userClass != null
 
   private val securityMgr = new SecurityManager(sparkConf)
 
   private var metricsSystem: Option[MetricsSystem] = None
 
-  // Set system properties for each config entry. This covers two use cases:
-  // - The default configuration stored by the SparkHadoopUtil class
-  // - The user application creating a new SparkConf in cluster mode
-  //
-  // Both cases create a new SparkConf object which reads these configs from system properties.
-  sparkConf.getAll.foreach { case (k, v) =>
-    sys.props(k) = v
-  }
-
-  private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
-
   private val userClassLoader = {
     val classpath = Client.getUserClasspath(sparkConf)
     val urls = classpath.map { entry =>
@@ -153,8 +145,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
   // Next wait interval before allocator poll.
   private var nextAllocationInterval = initialAllocationInterval
 
-  private var rpcEnv: RpcEnv = null
-
   // In cluster mode, used to tell the AM when the user's SparkContext has been initialized.
   private val sparkContextPromise = Promise[SparkContext]()
 
@@ -162,15 +152,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
    * Load the list of localized files set by the client, used when launching executors. This should
    * be called in a context where the needed credentials to access HDFS are available.
    */
-  private def prepareLocalResources(): Map[String, LocalResource] = {
+  private def prepareLocalResources(distCacheConf: SparkConf): Map[String, LocalResource] = {
     logInfo("Preparing Local resources")
-    val distCacheConf = new SparkConf(false)
-    if (args.distCacheConf != null) {
-      Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) =>
-        distCacheConf.set(k, v)
-      }
-    }
-
     val resources = HashMap[String, LocalResource]()
 
     def setupDistributedCache(
@@ -267,7 +250,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
           // we only want to unregister if we don't want the RM to retry
           if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
             unregister(finalStatus, finalMsg)
-            cleanupStagingDir()
+            cleanupStagingDir(new Path(System.getenv("SPARK_YARN_STAGING_DIR")))
           }
         }
       }
@@ -299,6 +282,60 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
     exitCode
   }
 
+  def runUnmanaged(
+      clientRpcEnv: RpcEnv,
+      appAttemptId: ApplicationAttemptId,
+      stagingDir: Path,
+      cachedResourcesConf: SparkConf): Unit = {
+    try {
+      new CallerContext(
+        "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
+        Option(appAttemptId.getApplicationId.toString), None).setCurrentContext()
+
+      val driverRef = clientRpcEnv.setupEndpointRef(
+        RpcAddress(sparkConf.get("spark.driver.host"),
+          sparkConf.get("spark.driver.port").toInt),
+        YarnSchedulerBackend.ENDPOINT_NAME)
+      // The client-mode AM doesn't listen for incoming connections, so report an invalid port.
+      registerAM(Utils.localHostName, -1, sparkConf,
+        sparkConf.getOption("spark.driver.appUIAddress"), appAttemptId)
+      addAmIpFilter(Some(driverRef), ProxyUriUtils.getPath(appAttemptId.getApplicationId))
+      createAllocator(driverRef, sparkConf, clientRpcEnv, appAttemptId, cachedResourcesConf)
+      reporterThread.join()
+    } catch {
+      case e: Exception =>
+        // catch everything else if not specifically handled
+        logError("Uncaught exception: ", e)
+        finish(FinalApplicationStatus.FAILED,
+          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
+          "Uncaught exception: " + StringUtils.stringifyException(e))
+        if (!unregistered) {
+          unregister(finalStatus, finalMsg)
+          cleanupStagingDir(stagingDir)
+        }
+    } finally {
+      try {
+        metricsSystem.foreach { ms =>
+          ms.report()
+          ms.stop()
+        }
+      } catch {
+        case e: Exception =>
+          logWarning("Exception during stopping of the metric system: ", e)
+      }
+    }
+  }
+
+  def stopUnmanaged(stagingDir: Path): Unit = {
+    if (!finished) {
+      finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
+    }
+    if (!unregistered) {
+      unregister(finalStatus, finalMsg)
+      cleanupStagingDir(stagingDir)
+    }
+  }
+
   /**
    * Set the default final application status for client mode to UNDEFINED to handle
    * if YARN HA restarts the application so that it properly retries. Set the final
@@ -376,9 +413,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
       host: String,
       port: Int,
       _sparkConf: SparkConf,
-      uiAddress: Option[String]): Unit = {
-    val appId = appAttemptId.getApplicationId().toString()
-    val attemptId = appAttemptId.getAttemptId().toString()
+      uiAddress: Option[String],
+      appAttempt: ApplicationAttemptId): Unit = {
+    val appId = appAttempt.getApplicationId().toString()
+    val attemptId = appAttempt.getAttemptId().toString()
     val historyAddress = ApplicationMaster
       .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)
 
@@ -386,7 +424,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
     registered = true
   }
 
-  private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
+  private def createAllocator(
+      driverRef: RpcEndpointRef,
+      _sparkConf: SparkConf,
+      rpcEnv: RpcEnv,
+      appAttemptId: ApplicationAttemptId,
+      distCacheConf: SparkConf): Unit = {
     // In client mode, the AM may be restarting after delegation tokens have reached their TTL. So
     // always contact the driver to get the current set of valid tokens, so that local resources can
     // be initialized below.
@@ -400,7 +443,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
     val appId = appAttemptId.getApplicationId().toString()
     val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
       CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
-    val localResources = prepareLocalResources()
+    val localResources = prepareLocalResources(distCacheConf)
 
     // Before we initialize the allocator, let's log the information about how executors will
     // be run up front, to avoid printing this out for every single executor being launched.
@@ -438,7 +481,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
   }
 
   private def runDriver(): Unit = {
-    addAmIpFilter(None)
+    addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
     userClassThread = startUserApplication()
 
     // This a bit hacky, but we need to wait until the spark.driver.port property has
@@ -449,17 +492,17 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
       val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
         Duration(totalWaitTime, TimeUnit.MILLISECONDS))
       if (sc != null) {
-        rpcEnv = sc.env.rpcEnv
+        val rpcEnv = sc.env.rpcEnv
 
         val userConf = sc.getConf
         val host = userConf.get(DRIVER_HOST_ADDRESS)
         val port = userConf.get(DRIVER_PORT)
-        registerAM(host, port, userConf, sc.ui.map(_.webUrl))
+        registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
 
         val driverRef = rpcEnv.setupEndpointRef(
           RpcAddress(host, port),
           YarnSchedulerBackend.ENDPOINT_NAME)
-        createAllocator(driverRef, userConf)
+        createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
       } else {
         // Sanity check; should never happen in normal operation, since sc should only be null
         // if the user app did not create a SparkContext.
@@ -483,11 +526,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
   private def runExecutorLauncher(): Unit = {
     val hostname = Utils.localHostName
     val amCores = sparkConf.get(AM_CORES)
-    rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
+    val rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
       amCores, true)
 
     // The client-mode AM doesn't listen for incoming connections, so report an invalid port.
-    registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS))
+    registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS), appAttemptId)
 
     // The driver should be up and listening, so unlike cluster mode, just try to connect to it
     // with no waiting or retrying.
@@ -495,8 +538,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
     val driverRef = rpcEnv.setupEndpointRef(
       RpcAddress(driverHost, driverPort),
       YarnSchedulerBackend.ENDPOINT_NAME)
-    addAmIpFilter(Some(driverRef))
-    createAllocator(driverRef, sparkConf)
+    addAmIpFilter(Some(driverRef),
+      System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
+    createAllocator(driverRef, sparkConf, rpcEnv, appAttemptId, distCacheConf)
 
     // In client mode the actor will stop the reporter thread.
     reporterThread.join()
@@ -592,15 +636,23 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
     t
   }
 
+  private def distCacheConf(): SparkConf = {
+    val distCacheConf = new SparkConf(false)
+    if (args.distCacheConf != null) {
+      Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) =>
+        distCacheConf.set(k, v)
+      }
+    }
+    distCacheConf
+  }
+
   /**
    * Clean up the staging directory.
    */
-  private def cleanupStagingDir(): Unit = {
-    var stagingDirPath: Path = null
+  private def cleanupStagingDir(stagingDirPath: Path): Unit = {
     try {
       val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
       if (!preserveFiles) {
-        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
         logInfo("Deleting staging directory " + stagingDirPath)
         val fs = stagingDirPath.getFileSystem(yarnConf)
         fs.delete(stagingDirPath, true)
@@ -612,8 +664,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
   }
 
   /** Add the Yarn IP filter that is required for properly securing the UI. */
-  private def addAmIpFilter(driver: Option[RpcEndpointRef]) = {
-    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
+  private def addAmIpFilter(driver: Option[RpcEndpointRef], proxyBase: String) = {
     val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
     val params = client.getAmIpFilterParams(yarnConf, proxyBase)
     driver match {
@@ -743,9 +794,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
-      // In cluster mode, do not rely on the disassociated event to exit
+      // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit
       // This avoids potentially reporting incorrect exit codes if the driver fails
-      if (!isClusterMode) {
+      if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
         logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
         finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
       }
@@ -771,12 +822,28 @@ object ApplicationMaster extends Logging {
   def main(args: Array[String]): Unit = {
     SignalUtils.registerLogger(log)
     val amArgs = new ApplicationMasterArguments(args)
-    master = new ApplicationMaster(amArgs)
+    val sparkConf = new SparkConf()
+    if (amArgs.propertiesFile != null) {
+      Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) =>
+        sparkConf.set(k, v)
+      }
+    }
+    // Set system properties for each config entry. This covers two use cases:
+    // - The default configuration stored by the SparkHadoopUtil class
+    // - The user application creating a new SparkConf in cluster mode
+    //
+    // Both cases create a new SparkConf object which reads these configs from system properties.
+    sparkConf.getAll.foreach { case (k, v) =>
+      sys.props(k) = v
+    }
+
+    val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))
+    master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
 
-    val ugi = master.sparkConf.get(PRINCIPAL) match {
+    val ugi = sparkConf.get(PRINCIPAL) match {
       case Some(principal) =>
         val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
-        SparkHadoopUtil.get.loginUserFromKeytab(principal, master.sparkConf.get(KEYTAB).orNull)
+        SparkHadoopUtil.get.loginUserFromKeytab(principal, sparkConf.get(KEYTAB).orNull)
         val newUGI = UserGroupInformation.getCurrentUser()
         // Transfer the original user's tokens to the new user, since it may contain needed tokens
         // (such as those user to connect to YARN).
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7992292..7582a2b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -34,7 +34,7 @@ import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.io.{DataOutputBuffer, Text}
 import org.apache.hadoop.mapreduce.MRJobConfig
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.util.StringUtils
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
 import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
@@ -55,11 +56,13 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
+import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.util.{CallerContext, Utils}
 
 private[spark] class Client(
     val args: ClientArguments,
-    val sparkConf: SparkConf)
+    val sparkConf: SparkConf,
+    val rpcEnv: RpcEnv)
   extends Logging {
 
   import Client._
@@ -70,6 +73,10 @@ private[spark] class Client(
 
   private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
 
+  private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode
+  private var appMaster: ApplicationMaster = _
+  private var stagingDirPath: Path = _
+
   // AM related configurations
   private val amMemory = if (isClusterMode) {
     sparkConf.get(DRIVER_MEMORY).toInt
@@ -133,16 +140,14 @@ private[spark] class Client(
 
   private var appId: ApplicationId = null
 
-  // The app staging dir based on the STAGING_DIR configuration if configured
-  // otherwise based on the users home directory.
-  private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
-    .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
-
   def reportLauncherState(state: SparkAppHandle.State): Unit = {
     launcherBackend.setState(state)
   }
 
   def stop(): Unit = {
+    if (appMaster != null) {
+      appMaster.stopUnmanaged(stagingDirPath)
+    }
     launcherBackend.close()
     yarnClient.stop()
   }
@@ -171,6 +176,12 @@ private[spark] class Client(
       val newAppResponse = newApp.getNewApplicationResponse()
       appId = newAppResponse.getApplicationId()
 
+      // The app staging dir based on the STAGING_DIR configuration if configured
+      // otherwise based on the users home directory.
+      val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
+        .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
+      stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
+
       new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
         Option(appId.toString)).setCurrentContext()
 
@@ -190,8 +201,8 @@ private[spark] class Client(
       appId
     } catch {
       case e: Throwable =>
-        if (appId != null) {
-          cleanupStagingDir(appId)
+        if (stagingDirPath != null) {
+          cleanupStagingDir()
         }
         throw e
     }
@@ -200,13 +211,12 @@ private[spark] class Client(
   /**
    * Cleanup application staging directory.
    */
-  private def cleanupStagingDir(appId: ApplicationId): Unit = {
+  private def cleanupStagingDir(): Unit = {
     if (sparkConf.get(PRESERVE_STAGING_FILES)) {
       return
     }
 
     def cleanupStagingDirInternal(): Unit = {
-      val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
       try {
         val fs = stagingDirPath.getFileSystem(hadoopConf)
         if (fs.delete(stagingDirPath, true)) {
@@ -289,7 +299,7 @@ private[spark] class Client(
             "does not support it", e)
       }
     }
-
+    appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)
     appContext
   }
 
@@ -850,7 +860,6 @@ private[spark] class Client(
     : ContainerLaunchContext = {
     logInfo("Setting up container launch context for our AM")
     val appId = newAppResponse.getApplicationId
-    val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
     val pySparkArchives =
       if (sparkConf.get(IS_PYTHON_APP)) {
         findPySparkArchives()
@@ -858,8 +867,8 @@ private[spark] class Client(
         Nil
       }
 
-    val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
-    val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
+    val launchEnv = setupLaunchEnv(stagingDirPath, pySparkArchives)
+    val localResources = prepareLocalResources(stagingDirPath, pySparkArchives)
 
     val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
     amContainer.setLocalResources(localResources.asJava)
@@ -1041,7 +1050,7 @@ private[spark] class Client(
         } catch {
           case e: ApplicationNotFoundException =>
             logError(s"Application $appId not found.")
-            cleanupStagingDir(appId)
+            cleanupStagingDir()
             return YarnAppReport(YarnApplicationState.KILLED, FinalApplicationStatus.KILLED, None)
           case NonFatal(e) =>
             val msg = s"Failed to contact YARN for application $appId."
@@ -1088,14 +1097,17 @@ private[spark] class Client(
       if (state == YarnApplicationState.FINISHED ||
           state == YarnApplicationState.FAILED ||
           state == YarnApplicationState.KILLED) {
-        cleanupStagingDir(appId)
+        cleanupStagingDir()
         return createAppReport(report)
       }
 
       if (returnOnRunning && state == YarnApplicationState.RUNNING) {
         return createAppReport(report)
       }
-
+      if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled &&
+          appMaster == null && report.getAMRMToken != null) {
+        appMaster = startApplicationMasterService(report)
+      }
       lastState = state
     }
 
@@ -1103,6 +1115,30 @@ private[spark] class Client(
     throw new SparkException("While loop is depleted! This should never happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport): ApplicationMaster = {
+    // Add AMRMToken to establish connection between RM and AM
+    val token = report.getAMRMToken
+    val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
+      new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](
+        token.getIdentifier().array(), token.getPassword().array,
+        new Text(token.getKind()), new Text(token.getService()))
+    val currentUGI = UserGroupInformation.getCurrentUser
+    currentUGI.addToken(amRMToken)
+
+    // Start Application Service in a separate thread and continue with application monitoring
+    val appMaster = new ApplicationMaster(
+      new ApplicationMasterArguments(Array.empty), sparkConf, hadoopConf)
+    val amService = new Thread("Unmanaged Application Master Service") {
+      override def run(): Unit = {
+        appMaster.runUnmanaged(rpcEnv, report.getCurrentApplicationAttemptId,
+          stagingDirPath, cachedResourcesConf)
+      }
+    }
+    amService.setDaemon(true)
+    amService.start()
+    appMaster
+  }
+
   private def formatReportDetails(report: ApplicationReport): String = {
     val details = Seq[(String, String)](
       ("client token", getClientToken(report)),
@@ -1535,7 +1571,7 @@ private[spark] class YarnClusterApplication extends SparkApplication {
     conf.remove(JARS)
     conf.remove(FILES)
 
-    new Client(new ClientArguments(args), conf).run()
+    new Client(new ClientArguments(args), conf, null).run()
   }
 
 }
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 49a0b93..77ce2f6 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -243,7 +243,7 @@ private[yarn] class ExecutorRunnable(
 
     // Add log urls
     container.foreach { c =>
-      sys.env.get("SPARK_USER").foreach { user =>
+      sys.env.filterKeys(_.endsWith("USER")).foreach { user =>
         val containerId = ConverterUtils.toString(c.getId)
         val address = c.getNodeHttpAddress
         val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user"
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 6091cd4..16adaec 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -236,6 +236,14 @@ package object config {
       .stringConf
       .createOptional
 
+  /* Unmanaged AM configuration. */
+
+  private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM.enabled")
+    .doc("In client mode, whether to launch the Application Master service as part of the client " +
+      "using unmanaged am.")
+    .booleanConf
+    .createWithDefault(false)
+
   /* Security configuration. */
 
   private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 934fba3..7b77f8c 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -54,7 +54,7 @@ private[spark] class YarnClientSchedulerBackend(
     logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
     val args = new ClientArguments(argsArrayBuf.toArray)
     totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
-    client = new Client(args, conf)
+    client = new Client(args, conf, sc.env.rpcEnv)
     bindToYarn(client.submitApplication(), None)
 
     // SPARK-8687: Ensure all necessary properties have already been set before
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 25827fd..5ff8e06 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -185,7 +185,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
     val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
     val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
 
-    val client = new Client(args, sparkConf)
+    val client = new Client(args, sparkConf, null)
     client.createApplicationSubmissionContext(
       new YarnClientApplication(getNewApplicationResponse, appContext),
       containerLaunchContext)
@@ -378,7 +378,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
       val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
 
-      val client = new Client(new ClientArguments(Array()), conf)
+      val client = new Client(new ClientArguments(Array()), conf, null)
       client.createApplicationSubmissionContext(
         new YarnClientApplication(getNewApplicationResponse, appContext),
         containerLaunchContext)
@@ -456,7 +456,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       sparkConf: SparkConf,
       args: Array[String] = Array()): Client = {
     val clientArgs = new ClientArguments(args)
-    spy(new Client(clientArgs, sparkConf))
+    spy(new Client(clientArgs, sparkConf, null))
   }
 
   private def classpath(client: Client): Array[String] = {
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index faddb8f..70ad231 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -89,6 +89,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     testBasicYarnApp(false)
   }
 
+  test("run Spark in yarn-client mode with unmanaged am") {
+    testBasicYarnApp(true, Map(YARN_UNMANAGED_AM.key -> "true"))
+  }
+
   test("run Spark in yarn-client mode with different configurations, ensuring redaction") {
     testBasicYarnApp(true,
       Map(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org