You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2020/11/05 18:40:37 UTC

[spark] branch master updated: [SPARK-33185][YARN] Set up yarn.Client to print direct links to driver stdout/stderr

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 324275a  [SPARK-33185][YARN] Set up yarn.Client to print direct links to driver stdout/stderr
324275a is described below

commit 324275ae8350ec15844ce384f40f1ecc4acdc072
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Thu Nov 5 12:38:42 2020 -0600

    [SPARK-33185][YARN] Set up yarn.Client to print direct links to driver stdout/stderr
    
    ### What changes were proposed in this pull request?
    Currently when run in `cluster` mode on YARN, the Spark `yarn.Client` will print out the application report into the logs, to be easily viewed by users. For example:
    ```
    INFO yarn.Client:
     	 client token: Token { kind: YARN_CLIENT_TOKEN, service:  }
     	 diagnostics: N/A
     	 ApplicationMaster host: X.X.X.X
     	 ApplicationMaster RPC port: 0
     	 queue: default
     	 start time: 1602782566027
     	 final status: UNDEFINED
     	 tracking URL: http://hostname:8888/proxy/application_<id>/
     	 user: xkrogen
    ```
    
    I propose adding, alongside the application report, some additional lines like:
    ```
             Driver Logs (stdout): http://hostname:8042/node/containerlogs/container_<id>/xkrogen/stdout?start=-4096
             Driver Logs (stderr): http://hostname:8042/node/containerlogs/container_<id>/xkrogen/stderr?start=-4096
    ```
    
    This information isn't contained in the `ApplicationReport`, so it's necessary to query the ResourceManager REST API. For now I have added this as an always-on feature, but if there is any concern about adding this REST dependency, I think hiding this feature behind an off-by-default flag is reasonable.
    
    ### Why are the changes needed?
    Typically, the tracking URL can be used to find the logs of the ApplicationMaster/driver while the application is running. Later, the Spark History Server can be used to track this information down, using the stdout/stderr links on the Executors page.
    
    However, in the situation when the driver crashed _before_ writing out a history file, the SHS may not be aware of this application, and thus does not contain links to the driver logs. When this situation arises, it can be difficult for users to debug further, since they can't easily find their driver logs.
    
    It is possible to reach the logs by using the `yarn logs` commands, but the average Spark user isn't aware of this and shouldn't have to be.
    
    With this information readily available in the logs, users can quickly jump to their driver logs, even if it crashed before the SHS became aware of the application. This has the additional benefit of providing a quick way to access driver logs, which often contain useful information, in a single click (instead of navigating through the Spark UI).
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, some additional print statements will be created in the application report when using YARN in cluster mode.
    
    ### How was this patch tested?
    Added unit tests for the parsing logic in `yarn.ClientSuite`. Also tested against a live cluster. When the driver is running:
    ```
    INFO Client: Application report for application_XXXXXXXXX_YYYYYY (state: RUNNING)
    INFO Client:
             client token: Token { kind: YARN_CLIENT_TOKEN, service:  }
             diagnostics: N/A
             ApplicationMaster host: host.example.com
             ApplicationMaster RPC port: ######
             queue: queue_name
             start time: 1604529046091
             final status: UNDEFINED
             tracking URL: http://host.example.com:8080/proxy/application_XXXXXXXXX_YYYYYY/
             user: xkrogen
             Driver Logs (stdout): http://host.example.com:8042/node/containerlogs/container_e07_XXXXXXXXX_YYYYYY_01_000001/xkrogen/stdout?start=-4096
             Driver Logs (stderr): http://host.example.com:8042/node/containerlogs/container_e07_XXXXXXXXX_YYYYYY_01_000001/xkrogen/stderr?start=-4096
    INFO Client: Application report for application_XXXXXXXXX_YYYYYY (state: RUNNING)
    ```
    I confirmed that when the driver has not yet launched, the report does not include the two Driver Logs items. Will omit the output here for brevity since it looks the same.
    
    Closes #30096 from xkrogen/xkrogen-SPARK-33185-yarn-client-print.
    
    Authored-by: Erik Krogen <xk...@apache.org>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/deploy/yarn/Client.scala      | 73 ++++++++++++++++++++--
 .../org/apache/spark/deploy/yarn/config.scala      |  9 +++
 .../spark/util/YarnContainerInfoHelper.scala       | 14 ++++-
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 47 ++++++++++++++
 4 files changed, 134 insertions(+), 9 deletions(-)

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 517a4af..30ca4a6 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -25,11 +25,16 @@ import java.util.{Locale, Properties, UUID}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConverters._
+import scala.collection.immutable.{Map => IMap}
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map}
 import scala.util.control.NonFatal
 
+import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.Objects
 import com.google.common.io.Files
+import javax.ws.rs.client.ClientBuilder
+import javax.ws.rs.core.MediaType
+import javax.ws.rs.core.Response.Status.Family
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
@@ -46,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
 import org.apache.hadoop.yarn.util.Records
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.api.python.PythonUtils
@@ -58,7 +64,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
 import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.util.{CallerContext, Utils}
+import org.apache.spark.util.{CallerContext, Utils, YarnContainerInfoHelper}
 
 private[spark] class Client(
     val args: ClientArguments,
@@ -1080,9 +1086,9 @@ private[spark] class Client(
         // If DEBUG is enabled, log report details every iteration
         // Otherwise, log them every time the application changes state
         if (log.isDebugEnabled) {
-          logDebug(formatReportDetails(report))
+          logDebug(formatReportDetails(report, getDriverLogsLink(report.getApplicationId)))
         } else if (lastState != state) {
-          logInfo(formatReportDetails(report))
+          logInfo(formatReportDetails(report, getDriverLogsLink(report.getApplicationId)))
         }
       }
 
@@ -1152,7 +1158,17 @@ private[spark] class Client(
     appMaster
   }
 
-  private def formatReportDetails(report: ApplicationReport): String = {
+  /**
+   * Format an application report and optionally, links to driver logs, in a human-friendly manner.
+   *
+   * @param report The application report from YARN.
+   * @param driverLogsLinks A map of driver log files and their links. Keys are the file names
+   *                        (e.g. `stdout`), and values are the links. If empty, nothing will be
+   *                        printed.
+   * @return Human-readable version of the input data.
+   */
+  private def formatReportDetails(report: ApplicationReport,
+    driverLogsLinks: IMap[String, String]): String = {
     val details = Seq[(String, String)](
       ("client token", getClientToken(report)),
       ("diagnostics", report.getDiagnostics),
@@ -1163,7 +1179,7 @@ private[spark] class Client(
       ("final status", report.getFinalApplicationStatus.toString),
       ("tracking URL", report.getTrackingUrl),
       ("user", report.getUser)
-    )
+    ) ++ driverLogsLinks.map { case (fname, link) => (s"Driver Logs ($fname)", link) }
 
     // Use more loggable format if value is null or empty
     details.map { case (k, v) =>
@@ -1173,6 +1189,37 @@ private[spark] class Client(
   }
 
   /**
+   * Fetch links to the logs of the driver for the given application ID. This requires hitting the
+   * RM REST API. Returns an empty map if the links could not be fetched. If this feature is
+   * disabled via [[CLIENT_INCLUDE_DRIVER_LOGS_LINK]], an empty map is returned immediately.
+   */
+  private def getDriverLogsLink(appId: ApplicationId): IMap[String, String] = {
+    if (!sparkConf.get(CLIENT_INCLUDE_DRIVER_LOGS_LINK)) {
+      return IMap()
+    }
+    try {
+      val baseRmUrl = WebAppUtils.getRMWebAppURLWithScheme(hadoopConf)
+      val response = ClientBuilder.newClient()
+          .target(baseRmUrl)
+          .path("ws").path("v1").path("cluster").path("apps")
+          .path(appId.toString).path("appattempts")
+          .request(MediaType.APPLICATION_JSON)
+          .get()
+      response.getStatusInfo.getFamily match {
+        case Family.SUCCESSFUL => parseAppAttemptsJsonResponse(response.readEntity(classOf[String]))
+        case _ =>
+          logWarning(s"Unable to fetch app attempts info from $baseRmUrl, got "
+              + s"status code ${response.getStatus}: ${response.getStatusInfo.getReasonPhrase}")
+          IMap()
+      }
+    } catch {
+      case e: Exception =>
+        logWarning(s"Unable to get driver log links for $appId", e)
+        IMap()
+    }
+  }
+
+  /**
    * Submit an application to the ResourceManager.
    * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
    * reporting the application's status until the application has exited for any reason.
@@ -1186,7 +1233,7 @@ private[spark] class Client(
       val report = getApplicationReport(appId)
       val state = report.getYarnApplicationState
       logInfo(s"Application report for $appId (state: $state)")
-      logInfo(formatReportDetails(report))
+      logInfo(formatReportDetails(report, getDriverLogsLink(report.getApplicationId)))
       if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
         throw new SparkException(s"Application $appId finished with status: $state")
       }
@@ -1577,6 +1624,20 @@ private object Client extends Logging {
     writer.flush()
     out.closeEntry()
   }
+
+  private[yarn] def parseAppAttemptsJsonResponse(jsonString: String): IMap[String, String] = {
+    val objectMapper = new ObjectMapper()
+    // If JSON response is malformed somewhere along the way, MissingNode will be returned,
+    // which allows for safe continuation of chaining. The `elements()` call will be empty,
+    // and None will get returned.
+    objectMapper.readTree(jsonString)
+      .path("appAttempts").path("appAttempt")
+      .elements().asScala.toList.takeRight(1).headOption
+      .map(_.path("logsLink").asText(""))
+      .filterNot(_ == "")
+      .map(baseUrl => YarnContainerInfoHelper.getLogUrlsFromBaseUrl(baseUrl))
+      .getOrElse(IMap())
+  }
 }
 
 private[spark] class YarnClusterApplication extends SparkApplication {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index f2e838f..89a4af2 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -189,6 +189,15 @@ package object config extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("1s")
 
+  private[spark] val CLIENT_INCLUDE_DRIVER_LOGS_LINK =
+    ConfigBuilder("spark.yarn.includeDriverLogsLink")
+      .doc("In cluster mode, whether the client application report includes links to the driver "
+          + "container's logs. This requires polling the ResourceManager's REST API, so it "
+          + "places some additional load on the RM.")
+      .version("3.1.0")
+      .booleanConf
+      .createWithDefault(false)
+
   /* Shared Client-mode AM / Driver configuration. */
 
   private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala
index 5e39422..854fe18 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/util/YarnContainerInfoHelper.scala
@@ -28,6 +28,16 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.internal.Logging
 
 private[spark] object YarnContainerInfoHelper extends Logging {
+
+  private[this] val DRIVER_LOG_FILE_NAMES = Seq("stdout", "stderr")
+  private[this] val DRIVER_LOG_START_OFFSET = -4096
+
+  def getLogUrlsFromBaseUrl(baseUrl: String): Map[String, String] = {
+    DRIVER_LOG_FILE_NAMES.map { fname =>
+      fname -> s"$baseUrl/$fname?start=$DRIVER_LOG_START_OFFSET"
+    }.toMap
+  }
+
   def getLogUrls(
       conf: Configuration,
       container: Option[Container]): Option[Map[String, String]] = {
@@ -42,9 +52,7 @@ private[spark] object YarnContainerInfoHelper extends Logging {
       val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
       logDebug(s"Base URL for logs: $baseUrl")
 
-      Some(Map(
-        "stdout" -> s"$baseUrl/stdout?start=-4096",
-        "stderr" -> s"$baseUrl/stderr?start=-4096"))
+      Some(getLogUrlsFromBaseUrl(baseUrl))
     } catch {
       case e: Exception =>
         logInfo("Error while building executor logs - executor logs will not be available", e)
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index ea3acec..fccb240 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -583,6 +583,53 @@ class ClientSuite extends SparkFunSuite with Matchers {
     }
   }
 
+  test("SPARK-33185 Parse YARN AppAttempts valid JSON response") {
+    val appIdSuffix = "1500000000000_1234567"
+    val containerId = s"container_e1_${appIdSuffix}_01_000001"
+    val nodeHost = "node.example.com"
+    val jsonString =
+      s"""
+        |{"appAttempts": {
+        |  "appAttempt": [ {
+        |    "id":1,
+        |    "startTime":1600000000000,
+        |    "finishedTime":1600000100000,
+        |    "containerId":"$containerId",
+        |    "nodeHttpAddress":"$nodeHost:8042",
+        |    "nodeId":"node.example.com:8041",
+        |    "logsLink":"http://$nodeHost:8042/node/containerlogs/$containerId/username",
+        |    "blacklistedNodes":"",
+        |    "nodesBlacklistedBySystem":"",
+        |    "appAttemptId":"appattempt_${appIdSuffix}_000001"
+        |  }]
+        |}}
+        |""".stripMargin
+    val logLinkMap = Client.parseAppAttemptsJsonResponse(jsonString)
+    assert(logLinkMap.keySet === Set("stdout", "stderr"))
+    assert(logLinkMap("stdout") ===
+        s"http://$nodeHost:8042/node/containerlogs/$containerId/username/stdout?start=-4096")
+    assert(logLinkMap("stderr") ===
+        s"http://$nodeHost:8042/node/containerlogs/$containerId/username/stderr?start=-4096")
+  }
+
+  test("SPARK-33185 Parse YARN AppAttempts invalid JSON response") {
+    // No "appAttempt" present
+    assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts": { } }""") === Map())
+
+    // "appAttempt" is empty
+    assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts": { "appAttempt": [ ] } }""")
+        === Map())
+
+    // logsLink is missing
+    assert(Client.parseAppAttemptsJsonResponse("""{"appAttempts":{"appAttempt":[{"id":1}]}}""")
+        === Map())
+
+    // logsLink is present but empty
+    assert(
+      Client.parseAppAttemptsJsonResponse("""{"appAttempts":{"appAttempt":[{"logsLink":""}]}}""")
+          === Map())
+  }
+
   private val matching = Seq(
     ("files URI match test1", "file:///file1", "file:///file2"),
     ("files URI match test2", "file:///c:file1", "file://c:file2"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org