You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2021/11/28 14:54:50 UTC

[spark] branch master updated: [SPARK-37461][YARN] YARN-CLIENT mode client.appId is always null

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new db9a982  [SPARK-37461][YARN] YARN-CLIENT mode client.appId is always null
db9a982 is described below

commit db9a982a1441810314be07e2c3b7ccffff77d1f1
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Sun Nov 28 08:53:25 2021 -0600

    [SPARK-37461][YARN] YARN-CLIENT mode client.appId is always null
    
    ### What changes were proposed in this pull request?
    In yarn-client mode, `Client.appId` variable is not assigned, it is always `null`,  in cluster mode, this variable will be assigned to the true value. In this patch, we assign true application id to `appId` too
    
    ### Why are the changes needed?
    
    1. Refactor the code to avoid define different id in each function, we can just use this variable.
    2. In client mode, user can use this value to get the application id.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manuel tested.
    
    We have a internal proxy server to replace yarn tracking url, here use `appId`, with this patch it's not null.
    
    ```
    21/11/26 12:38:44 INFO Client:
    	 client token: N/A
    	 diagnostics: AM container is launched, waiting for AM container to Register with RM
    	 ApplicationMaster host: N/A
    	 ApplicationMaster RPC port: -1
    	 queue: user_queue
    	 start time: 1637901520956
    	 final status: UNDEFINED
    	 tracking URL: http://internal-proxy-server/proxy?applicationId=application_1635856758535_4209064
    	 user: user_name
    ```
    
    Closes #34710 from AngersZhuuuu/SPARK-37461.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../main/scala/org/apache/spark/deploy/yarn/Client.scala    | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7787e2f..e6136fc 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -169,7 +169,6 @@ private[spark] class Client(
   def submitApplication(): ApplicationId = {
     ResourceRequestHelper.validateResources(sparkConf)
 
-    var appId: ApplicationId = null
     try {
       launcherBackend.connect()
       yarnClient.init(hadoopConf)
@@ -181,7 +180,7 @@ private[spark] class Client(
       // Get a new application from our RM
       val newApp = yarnClient.createApplication()
       val newAppResponse = newApp.getNewApplicationResponse()
-      appId = newAppResponse.getApplicationId()
+      this.appId = newAppResponse.getApplicationId()
 
       // The app staging dir based on the STAGING_DIR configuration if configured
       // otherwise based on the users home directory.
@@ -207,8 +206,7 @@ private[spark] class Client(
       yarnClient.submitApplication(appContext)
       launcherBackend.setAppId(appId.toString)
       reportLauncherState(SparkAppHandle.State.SUBMITTED)
-
-      appId
+      this.appId
     } catch {
       case e: Throwable =>
         if (stagingDirPath != null) {
@@ -915,7 +913,6 @@ private[spark] class Client(
   private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
     : ContainerLaunchContext = {
     logInfo("Setting up container launch context for our AM")
-    val appId = newAppResponse.getApplicationId
     val pySparkArchives =
       if (sparkConf.get(IS_PYTHON_APP)) {
         findPySparkArchives()
@@ -971,7 +968,7 @@ private[spark] class Client(
     if (isClusterMode) {
       sparkConf.get(DRIVER_JAVA_OPTIONS).foreach { opts =>
         javaOpts ++= Utils.splitCommandString(opts)
-          .map(Utils.substituteAppId(_, appId.toString))
+          .map(Utils.substituteAppId(_, this.appId.toString))
           .map(YarnSparkHadoopUtil.escapeForShell)
       }
       val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
@@ -996,7 +993,7 @@ private[spark] class Client(
           throw new SparkException(msg)
         }
         javaOpts ++= Utils.splitCommandString(opts)
-          .map(Utils.substituteAppId(_, appId.toString))
+          .map(Utils.substituteAppId(_, this.appId.toString))
           .map(YarnSparkHadoopUtil.escapeForShell)
       }
       sparkConf.get(AM_LIBRARY_PATH).foreach { paths =>
@@ -1269,7 +1266,7 @@ private[spark] class Client(
    * throw an appropriate SparkException.
    */
   def run(): Unit = {
-    this.appId = submitApplication()
+    submitApplication()
     if (!launcherBackend.isConnected() && fireAndForget) {
       val report = getApplicationReport(appId)
       val state = report.getYarnApplicationState

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org